Fix flaky LongBufferPoolTest patch by Jon Meredith; reviewed by Dinesh Joshi for CASSANDRA-14790
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e07d53aa Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e07d53aa Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e07d53aa Branch: refs/heads/trunk Commit: e07d53aaec94a498028d988f7d2c7ae7e6b620d0 Parents: 285153f Author: Jon Meredith <jmeredit...@gmail.com> Authored: Thu Oct 4 17:08:52 2018 -0600 Committer: Aleksey Yeshchenko <alek...@apple.com> Committed: Tue Oct 23 23:22:13 2018 +0100 ---------------------------------------------------------------------- build.xml | 33 + .../utils/memory/LongBufferPoolTest.java | 614 ++++++++++++------- 2 files changed, 411 insertions(+), 236 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e07d53aa/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index d7e5444..d7e6c4b 100644 --- a/build.xml +++ b/build.xml @@ -1345,6 +1345,14 @@ </testmacro> </target> + <!-- Use this with an FQDN for test class, and a csv list of methods like this: + ant burn-testsome -Dtest.name=org.apache.cassandra.utils.memory.LongBufferPoolTest -Dtest.methods=testAllocate + --> + <target name="burn-testsome" depends="build-test" description="Execute specific burn unit tests" > + <testmacro inputdir="${test.burn.src}" timeout="${test.burn.timeout}"> + <test name="${test.name}" methods="${test.methods}"/> + </testmacro> + </target> <target name="test-compression" depends="build-test" description="Execute unit tests with sstable compression enabled"> <property name="compressed_yaml" value="${build.test.dir}/cassandra.compressed.yaml"/> <concat destfile="${compressed_yaml}"> @@ -1742,6 +1750,31 @@ </java> </target> + <!-- run arbitrary mains in tests, for example to run the long running memory tests with lots of memory pressure + ant run-main -Dmainclass=org.apache.cassandra.utils.memory.LongBufferPoolTest -Dvmargs="-Xmx30m -XX:-UseGCOverheadLimit" + --> + <target name="run-main" depends="build-test"> + <property name="mainclass" value="" /> + <property name="vmargs" value="" /> + <property name="args" value="" /> + <java classname="${mainclass}" + fork="true" + failonerror="true"> + <jvmarg value="-server" /> + <jvmarg value="-ea" /> + <jvmarg line="${vmargs}" /> + <arg line="${args}" /> + <classpath> + <path refid="cassandra.classpath" /> + <pathelement location="${test.classes}"/> + <pathelement location="${test.conf}"/> + <fileset dir="${test.lib}"> + <include name="**/*.jar" /> + </fileset> + </classpath> + </java> + </target> + <!-- Generate IDEA project description files --> <target name="generate-idea-files" depends="build-test" description="Generate IDEA files"> <mkdir dir=".idea"/> http://git-wip-us.apache.org/repos/asf/cassandra/blob/e07d53aa/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java ---------------------------------------------------------------------- diff --git a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java index 17ac569..66abe5a 100644 --- a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java +++ b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java @@ -36,10 +36,36 @@ import org.apache.cassandra.utils.DynamicList; import static org.junit.Assert.*; +/** + * Long BufferPool test - make sure that the BufferPool allocates and recycles + * ByteBuffers under heavy concurrent usage. + * + * The test creates two groups of threads + * + * - the burn producer/consumer pair that allocates 1/10 poolSize and then returns + * all the memory to the pool. 50% is freed by the producer, 50% passed to the consumer thread. + * + * - a ring of worker threads that allocate buffers and either immediately free them, + * or pass to the next worker thread for it to be freed on it's behalf. Periodically + * all memory is freed by the thread. + * + * While the burn/worker threads run, the original main thread checks that all of the threads are still + * making progress every 10s (no locking issues, or exits from assertion failures), + * and that every chunk has been freed at least once during the previous cycle (if that was possible). + * + * The test does not expect to survive out-of-memory errors, so needs sufficient heap memory + * for non-direct buffers and the debug tracking objects that check the allocate buffers. + * (The timing is very interesting when Xmx is lowered to increase garbage collection pauses, but do + * not set it too low). + */ public class LongBufferPoolTest { private static final Logger logger = LoggerFactory.getLogger(LongBufferPoolTest.class); + private static final int AVG_BUFFER_SIZE = 16 << 10; + private static final int STDEV_BUFFER_SIZE = 10 << 10; // picked to ensure exceeding buffer size is rare, but occurs + private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); + @Test public void testAllocate() throws InterruptedException, ExecutionException { @@ -73,299 +99,393 @@ public class LongBufferPoolTest } } - public void testAllocate(int threadCount, long duration, int poolSize) throws InterruptedException, ExecutionException + private static final class TestEnvironment { - final int avgBufferSize = 16 << 10; - final int stdevBufferSize = 10 << 10; // picked to ensure exceeding buffer size is rare, but occurs - final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); + final int threadCount; + final long duration; + final int poolSize; + final long until; + final CountDownLatch latch; + final SPSCQueue<BufferCheck>[] sharedRecycle; + final AtomicBoolean[] makingProgress; + final AtomicBoolean burnFreed; + final AtomicBoolean[] freedAllMemory; + final ExecutorService executorService; + final List<Future<Boolean>> threadResultFuture; + final int targetSizeQuanta; + + TestEnvironment(int threadCount, long duration, int poolSize) + { + this.threadCount = threadCount; + this.duration = duration; + this.poolSize = poolSize; + until = System.nanoTime() + duration; + latch = new CountDownLatch(threadCount); + sharedRecycle = new SPSCQueue[threadCount]; + makingProgress = new AtomicBoolean[threadCount]; + burnFreed = new AtomicBoolean(false); + freedAllMemory = new AtomicBoolean[threadCount]; + executorService = Executors.newFixedThreadPool(threadCount + 2); + threadResultFuture = new ArrayList<>(threadCount); + + for (int i = 0; i < sharedRecycle.length; i++) + { + sharedRecycle[i] = new SPSCQueue<>(); + makingProgress[i] = new AtomicBoolean(false); + freedAllMemory[i] = new AtomicBoolean(false); + } - System.out.println(String.format("%s - testing %d threads for %dm", - dateFormat.format(new Date()), - threadCount, - TimeUnit.NANOSECONDS.toMinutes(duration))); + // Divide the poolSize across our threads, deliberately over-subscribing it. Threads + // allocate a different amount of memory each - 1*quanta, 2*quanta, ... N*quanta. + // Thread0 is always going to be a single CHUNK, then to allocate increasing amounts + // using their own algorithm the targetSize should be poolSize / targetSizeQuanta. + // + // This should divide double the poolSize across the working threads, + // plus CHUNK_SIZE for thread0 and 1/10 poolSize for the burn producer/consumer pair. + targetSizeQuanta = 2 * poolSize / sum1toN(threadCount - 1); + } - final long until = System.nanoTime() + duration; - final CountDownLatch latch = new CountDownLatch(threadCount); - final SPSCQueue<BufferCheck>[] sharedRecycle = new SPSCQueue[threadCount]; - final AtomicBoolean[] makingProgress = new AtomicBoolean[threadCount]; - for (int i = 0 ; i < sharedRecycle.length ; i++) + void addCheckedFuture(Future<Boolean> future) { - sharedRecycle[i] = new SPSCQueue<>(); - makingProgress[i] = new AtomicBoolean(true); + threadResultFuture.add(future); } - ExecutorService executorService = Executors.newFixedThreadPool(threadCount + 2); - List<Future<Boolean>> ret = new ArrayList<>(threadCount); - long prevPoolSize = BufferPool.MEMORY_USAGE_THRESHOLD; - BufferPool.MEMORY_USAGE_THRESHOLD = poolSize; - BufferPool.DEBUG = true; - // sum(1..n) = n/2 * (n + 1); we set zero to CHUNK_SIZE, so have n=threadCount-1 - int targetSizeQuanta = ((threadCount) * (threadCount - 1)) / 2; - // fix targetSizeQuanta at 1/64th our poolSize, so that we only consciously exceed our pool size limit - targetSizeQuanta = (targetSizeQuanta * poolSize) / 64; - + int countStalledThreads() { - // setup some high churn allocate/deallocate, without any checking - final SPSCQueue<ByteBuffer> burn = new SPSCQueue<>(); - final CountDownLatch doneAdd = new CountDownLatch(1); - executorService.submit(new TestUntil(until) + int stalledThreads = 0; + + for (AtomicBoolean progress : makingProgress) { - int count = 0; - void testOne() throws Exception - { - if (count * BufferPool.CHUNK_SIZE >= poolSize / 10) - { - if (burn.exhausted) - count = 0; - else - Thread.yield(); - return; - } + if (!progress.getAndSet(false)) + stalledThreads++; + } + return stalledThreads; + } - ByteBuffer buffer = BufferPool.tryGet(BufferPool.CHUNK_SIZE); - if (buffer == null) - { - Thread.yield(); - return; - } + int countDoneThreads() + { + int doneThreads = 0; + for (Future<Boolean> r : threadResultFuture) + { + if (r.isDone()) + doneThreads++; + } + return doneThreads; + } - BufferPool.put(buffer); - burn.add(buffer); - count++; - } - void cleanup() - { - doneAdd.countDown(); - } - }); - executorService.submit(new TestUntil(until) + void assertCheckedThreadsSucceeded() + { + try { - void testOne() throws Exception - { - ByteBuffer buffer = burn.poll(); - if (buffer == null) - { - Thread.yield(); - return; - } - BufferPool.put(buffer); - } - void cleanup() - { - Uninterruptibles.awaitUninterruptibly(doneAdd); - } - }); + for (Future<Boolean> r : threadResultFuture) + assertTrue(r.get()); + } + catch (InterruptedException ex) + { + // If interrupted while checking, restart and check everything. + assertCheckedThreadsSucceeded(); + } + catch (ExecutionException ex) + { + fail("Checked thread threw exception: " + ex.toString()); + } } + } + + public void testAllocate(int threadCount, long duration, int poolSize) throws InterruptedException, ExecutionException + { + System.out.println(String.format("%s - testing %d threads for %dm", + DATE_FORMAT.format(new Date()), + threadCount, + TimeUnit.NANOSECONDS.toMinutes(duration))); + long prevPoolSize = BufferPool.MEMORY_USAGE_THRESHOLD; + logger.info("Overriding configured BufferPool.MEMORY_USAGE_THRESHOLD={} and enabling BufferPool.DEBUG", poolSize); + BufferPool.MEMORY_USAGE_THRESHOLD = poolSize; + BufferPool.DEBUG = true; + + TestEnvironment testEnv = new TestEnvironment(threadCount, duration, poolSize); + + startBurnerThreads(testEnv); - for (int t = 0; t < threadCount; t++) + for (int threadIdx = 0; threadIdx < threadCount; threadIdx++) + testEnv.addCheckedFuture(startWorkerThread(testEnv, threadIdx)); + + while (!testEnv.latch.await(10L, TimeUnit.SECONDS)) { - final int threadIdx = t; - final int targetSize = t == 0 ? BufferPool.CHUNK_SIZE : targetSizeQuanta * t; + int stalledThreads = testEnv.countStalledThreads(); + int doneThreads = testEnv.countDoneThreads(); + + if (doneThreads == 0) // If any threads have completed, they will stop making progress/recycling buffers. + { // Assertions failures on the threads will be caught below. + assert stalledThreads == 0; + boolean allFreed = testEnv.burnFreed.getAndSet(false); + for (AtomicBoolean freedMemory : testEnv.freedAllMemory) + allFreed = allFreed && freedMemory.getAndSet(false); + if (allFreed) + BufferPool.assertAllRecycled(); + else + logger.info("All threads did not free all memory in this time slot - skipping buffer recycle check"); + } + } - ret.add(executorService.submit(new TestUntil(until) + for (SPSCQueue<BufferCheck> queue : testEnv.sharedRecycle) + { + BufferCheck check; + while ( null != (check = queue.poll()) ) { - final SPSCQueue<BufferCheck> shareFrom = sharedRecycle[threadIdx]; - final DynamicList<BufferCheck> checks = new DynamicList<>((int) Math.max(1, targetSize / (1 << 10))); - final SPSCQueue<BufferCheck> shareTo = sharedRecycle[(threadIdx + 1) % threadCount]; - final ThreadLocalRandom rand = ThreadLocalRandom.current(); - int totalSize = 0; - int freeingSize = 0; - int size = 0; - - void checkpoint() - { - if (!makingProgress[threadIdx].get()) - makingProgress[threadIdx].set(true); - } + check.validate(); + BufferPool.put(check.buffer); + } + } - void testOne() throws Exception - { + assertEquals(0, testEnv.executorService.shutdownNow().size()); - long currentTargetSize = rand.nextInt(poolSize / 1024) == 0 ? 0 : targetSize; - int spinCount = 0; - while (totalSize > currentTargetSize - freeingSize) - { - // free buffers until we're below our target size - if (checks.size() == 0) - { - // if we're out of buffers to free, we're waiting on our neighbour to free them; - // first check if the consuming neighbour has caught up, and if so mark that free - if (shareTo.exhausted) - { - totalSize -= freeingSize; - freeingSize = 0; - } - else if (!recycleFromNeighbour()) - { - if (++spinCount > 1000 && System.nanoTime() > until) - return; - // otherwise, free one of our other neighbour's buffers if can; and otherwise yield - Thread.yield(); - } - continue; - } + logger.info("Reverting BufferPool.MEMORY_USAGE_THRESHOLD={}", prevPoolSize); + BufferPool.MEMORY_USAGE_THRESHOLD = prevPoolSize; + BufferPool.DEBUG = false; - // pick a random buffer, with preference going to earlier ones - BufferCheck check = sample(); - checks.remove(check.listnode); - check.validate(); + testEnv.assertCheckedThreadsSucceeded(); - size = BufferPool.roundUpNormal(check.buffer.capacity()); - if (size > BufferPool.CHUNK_SIZE) - size = 0; + System.out.println(String.format("%s - finished.", + DATE_FORMAT.format(new Date()))); + } - // either share to free, or free immediately - if (rand.nextBoolean()) + private Future<Boolean> startWorkerThread(TestEnvironment testEnv, final int threadIdx) + { + return testEnv.executorService.submit(new TestUntil(testEnv.until) + { + final int targetSize = threadIdx == 0 ? BufferPool.CHUNK_SIZE : testEnv.targetSizeQuanta * threadIdx; + final SPSCQueue<BufferCheck> shareFrom = testEnv.sharedRecycle[threadIdx]; + final DynamicList<BufferCheck> checks = new DynamicList<>((int) Math.max(1, targetSize / (1 << 10))); + final SPSCQueue<BufferCheck> shareTo = testEnv.sharedRecycle[(threadIdx + 1) % testEnv.threadCount]; + final ThreadLocalRandom rand = ThreadLocalRandom.current(); + int totalSize = 0; + int freeingSize = 0; + int size = 0; + + void checkpoint() + { + if (!testEnv.makingProgress[threadIdx].get()) + testEnv.makingProgress[threadIdx].set(true); + } + + void testOne() throws Exception + { + + long currentTargetSize = (rand.nextInt(testEnv.poolSize / 1024) == 0 || !testEnv.freedAllMemory[threadIdx].get()) ? 0 : targetSize; + int spinCount = 0; + while (totalSize > currentTargetSize - freeingSize) + { + // free buffers until we're below our target size + if (checks.size() == 0) + { + // if we're out of buffers to free, we're waiting on our neighbour to free them; + // first check if the consuming neighbour has caught up, and if so mark that free + if (shareTo.exhausted) { - shareTo.add(check); - freeingSize += size; - // interleave this with potentially messing with the other neighbour's stuff - recycleFromNeighbour(); + totalSize -= freeingSize; + freeingSize = 0; } - else + else if (!recycleFromNeighbour()) { - check.validate(); - BufferPool.put(check.buffer); - totalSize -= size; + if (++spinCount > 1000 && System.nanoTime() > until) + return; + // otherwise, free one of our other neighbour's buffers if can; and otherwise yield + Thread.yield(); } + continue; } - // allocate a new buffer - size = (int) Math.max(1, avgBufferSize + (stdevBufferSize * rand.nextGaussian())); - if (size <= BufferPool.CHUNK_SIZE) - { - totalSize += BufferPool.roundUpNormal(size); - allocate(size); - } - else if (rand.nextBoolean()) + // pick a random buffer, with preference going to earlier ones + BufferCheck check = sample(); + checks.remove(check.listnode); + check.validate(); + + size = BufferPool.roundUpNormal(check.buffer.capacity()); + if (size > BufferPool.CHUNK_SIZE) + size = 0; + + // either share to free, or free immediately + if (rand.nextBoolean()) { - allocate(size); + shareTo.add(check); + freeingSize += size; + // interleave this with potentially messing with the other neighbour's stuff + recycleFromNeighbour(); } else { - // perform a burst allocation to exhaust all available memory - while (totalSize < poolSize) - { - size = (int) Math.max(1, avgBufferSize + (stdevBufferSize * rand.nextGaussian())); - if (size <= BufferPool.CHUNK_SIZE) - { - allocate(size); - totalSize += BufferPool.roundUpNormal(size); - } - } + check.validate(); + BufferPool.put(check.buffer); + totalSize -= size; } + } - // validate a random buffer we have stashed - checks.get(rand.nextInt(checks.size())).validate(); + if (currentTargetSize == 0) + testEnv.freedAllMemory[threadIdx].compareAndSet(false, true); - // free all of our neighbour's remaining shared buffers - while (recycleFromNeighbour()); + // allocate a new buffer + size = (int) Math.max(1, AVG_BUFFER_SIZE + (STDEV_BUFFER_SIZE * rand.nextGaussian())); + if (size <= BufferPool.CHUNK_SIZE) + { + totalSize += BufferPool.roundUpNormal(size); + allocate(size); } - - void cleanup() + else if (rand.nextBoolean()) + { + allocate(size); + } + else { - while (checks.size() > 0) + // perform a burst allocation to exhaust all available memory + while (totalSize < testEnv.poolSize) { - BufferCheck check = checks.get(0); - BufferPool.put(check.buffer); - checks.remove(check.listnode); + size = (int) Math.max(1, AVG_BUFFER_SIZE + (STDEV_BUFFER_SIZE * rand.nextGaussian())); + if (size <= BufferPool.CHUNK_SIZE) + { + allocate(size); + totalSize += BufferPool.roundUpNormal(size); + } } - latch.countDown(); } - boolean recycleFromNeighbour() + // validate a random buffer we have stashed + checks.get(rand.nextInt(checks.size())).validate(); + + // free all of our neighbour's remaining shared buffers + while (recycleFromNeighbour()); + } + + void cleanup() + { + while (checks.size() > 0) { - BufferCheck check = shareFrom.poll(); - if (check == null) - return false; - check.validate(); + BufferCheck check = checks.get(0); BufferPool.put(check.buffer); - return true; + checks.remove(check.listnode); } + testEnv.latch.countDown(); + } - BufferCheck allocate(int size) - { - ByteBuffer buffer = BufferPool.get(size); - assertNotNull(buffer); - BufferCheck check = new BufferCheck(buffer, rand.nextLong()); - assertEquals(size, buffer.capacity()); - assertEquals(0, buffer.position()); - check.init(); - check.listnode = checks.append(check); - return check; - } + boolean recycleFromNeighbour() + { + BufferCheck check = shareFrom.poll(); + if (check == null) + return false; + check.validate(); + BufferPool.put(check.buffer); + return true; + } - BufferCheck sample() + BufferCheck allocate(int size) + { + ByteBuffer buffer = BufferPool.get(size); + assertNotNull(buffer); + BufferCheck check = new BufferCheck(buffer, rand.nextLong()); + assertEquals(size, buffer.capacity()); + assertEquals(0, buffer.position()); + check.init(); + check.listnode = checks.append(check); + return check; + } + + BufferCheck sample() + { + // sample with preference to first elements: + // element at index n will be selected with likelihood (size - n) / sum1ToN(size) + int size = checks.size(); + + // pick a random number between 1 and sum1toN(size) + int sampleRange = sum1toN(size); + int sampleIndex = rand.nextInt(sampleRange); + + // then binary search for the N, such that [sum1ToN(N), sum1ToN(N+1)) contains this random number + int moveBy = Math.max(size / 4, 1); + int index = size / 2; + while (true) { - // sample with preference to first elements: - // element at index n will be selected with likelihood (size - n) / sum1ToN(size) - int size = checks.size(); - - // pick a random number between 1 and sum1toN(size) - int sampleRange = sum1toN(size); - int sampleIndex = rand.nextInt(sampleRange); - - // then binary search for the N, such that [sum1ToN(N), sum1ToN(N+1)) contains this random number - int moveBy = Math.max(size / 4, 1); - int index = size / 2; - while (true) + int baseSampleIndex = sum1toN(index); + int endOfSampleIndex = sum1toN(index + 1); + if (sampleIndex >= baseSampleIndex) { - int baseSampleIndex = sum1toN(index); - int endOfSampleIndex = sum1toN(index + 1); - if (sampleIndex >= baseSampleIndex) - { - if (sampleIndex < endOfSampleIndex) - break; - index += moveBy; - } - else index -= moveBy; - moveBy = Math.max(moveBy / 2, 1); + if (sampleIndex < endOfSampleIndex) + break; + index += moveBy; } + else index -= moveBy; + moveBy = Math.max(moveBy / 2, 1); + } - // this gives us the inverse of our desired value, so just subtract it from the last index - index = size - (index + 1); + // this gives us the inverse of our desired value, so just subtract it from the last index + index = size - (index + 1); - return checks.get(index); + return checks.get(index); + } + }); + } + + private void startBurnerThreads(TestEnvironment testEnv) + { + // setup some high churn allocate/deallocate, without any checking + final SPSCQueue<ByteBuffer> burn = new SPSCQueue<>(); + final CountDownLatch doneAdd = new CountDownLatch(1); + testEnv.addCheckedFuture(testEnv.executorService.submit(new TestUntil(testEnv.until) + { + int count = 0; + final ThreadLocalRandom rand = ThreadLocalRandom.current(); + void testOne() throws Exception + { + if (count * BufferPool.CHUNK_SIZE >= testEnv.poolSize / 10) + { + if (burn.exhausted) + { + count = 0; + testEnv.burnFreed.compareAndSet(false, true); + } else + { + Thread.yield(); + } + return; } - private int sum1toN(int n) + ByteBuffer buffer = BufferPool.tryGet(BufferPool.CHUNK_SIZE); + if (buffer == null) { - return (n * (n + 1)) / 2; + Thread.yield(); + return; } - })); - } - boolean first = true; - while (!latch.await(10L, TimeUnit.SECONDS)) - { - if (!first) - BufferPool.assertAllRecycled(); - first = false; - for (AtomicBoolean progress : makingProgress) + // 50/50 chance of returning the buffer from the producer thread, or + // pass it on to the consumer. + if (rand.nextBoolean()) + BufferPool.put(buffer); + else + burn.add(buffer); + + count++; + } + void cleanup() { - assert progress.get(); - progress.set(false); + doneAdd.countDown(); } - } - - for (SPSCQueue<BufferCheck> queue : sharedRecycle) + })); + testEnv.threadResultFuture.add(testEnv.executorService.submit(new TestUntil(testEnv.until) { - BufferCheck check; - while ( null != (check = queue.poll()) ) + void testOne() throws Exception { - check.validate(); - BufferPool.put(check.buffer); + ByteBuffer buffer = burn.poll(); + if (buffer == null) + { + Thread.yield(); + return; + } + BufferPool.put(buffer); } - } - - assertEquals(0, executorService.shutdownNow().size()); - - BufferPool.MEMORY_USAGE_THRESHOLD = prevPoolSize; - for (Future<Boolean> r : ret) - assertTrue(r.get()); - - System.out.println(String.format("%s - finished.", - dateFormat.format(new Date()))); + void cleanup() + { + Uninterruptibles.awaitUninterruptibly(doneAdd); + } + })); } static abstract class TestUntil implements Callable<Boolean> @@ -399,6 +519,14 @@ public class LongBufferPoolTest ex.printStackTrace(); return false; } + catch (Throwable tr) // for java.lang.OutOfMemoryError + { + logger.error("Got throwable {}, current chunk {}", + tr.getMessage(), + BufferPool.currentChunk()); + tr.printStackTrace(); + return false; + } finally { cleanup(); @@ -407,9 +535,19 @@ public class LongBufferPoolTest } } - public static void main(String[] args) throws InterruptedException, ExecutionException + public static void main(String[] args) { - new LongBufferPoolTest().testAllocate(Runtime.getRuntime().availableProcessors(), TimeUnit.HOURS.toNanos(2L), 16 << 20); + try + { + new LongBufferPoolTest().testAllocate(Runtime.getRuntime().availableProcessors(), + TimeUnit.HOURS.toNanos(2L), 16 << 20); + System.exit(0); + } + catch (Throwable tr) + { + System.out.println(String.format("Test failed - %s", tr.getMessage())); + System.exit(1); // Force exit so that non-daemon threads like REQUEST-SCHEDULER do not hang the process on failure + } } /** @@ -451,4 +589,8 @@ public class LongBufferPoolTest } } -} \ No newline at end of file + private static int sum1toN(int n) + { + return (n * (n + 1)) / 2; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org