Fix flaky LongBufferPoolTest

patch by Jon Meredith; reviewed by Dinesh Joshi for CASSANDRA-14790


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e07d53aa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e07d53aa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e07d53aa

Branch: refs/heads/cassandra-3.11
Commit: e07d53aaec94a498028d988f7d2c7ae7e6b620d0
Parents: 285153f
Author: Jon Meredith <jmeredit...@gmail.com>
Authored: Thu Oct 4 17:08:52 2018 -0600
Committer: Aleksey Yeshchenko <alek...@apple.com>
Committed: Tue Oct 23 23:22:13 2018 +0100

----------------------------------------------------------------------
 build.xml                                       |  33 +
 .../utils/memory/LongBufferPoolTest.java        | 614 ++++++++++++-------
 2 files changed, 411 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e07d53aa/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index d7e5444..d7e6c4b 100644
--- a/build.xml
+++ b/build.xml
@@ -1345,6 +1345,14 @@
     </testmacro>
   </target>
 
+  <!-- Use this with an FQDN for test class, and a csv list of methods like 
this:
+    ant burn-testsome 
-Dtest.name=org.apache.cassandra.utils.memory.LongBufferPoolTest 
-Dtest.methods=testAllocate
+  -->
+  <target name="burn-testsome" depends="build-test" description="Execute 
specific burn unit tests" >
+    <testmacro inputdir="${test.burn.src}" timeout="${test.burn.timeout}">
+      <test name="${test.name}" methods="${test.methods}"/>
+    </testmacro>
+  </target>
   <target name="test-compression" depends="build-test" description="Execute 
unit tests with sstable compression enabled">
     <property name="compressed_yaml" 
value="${build.test.dir}/cassandra.compressed.yaml"/>
     <concat destfile="${compressed_yaml}">
@@ -1742,6 +1750,31 @@
       </java>
   </target>
 
+  <!-- run arbitrary mains in tests, for example to run the long running 
memory tests with lots of memory pressure
+      ant run-main 
-Dmainclass=org.apache.cassandra.utils.memory.LongBufferPoolTest 
-Dvmargs="-Xmx30m -XX:-UseGCOverheadLimit"
+  -->
+  <target name="run-main" depends="build-test">
+      <property name="mainclass" value="" />
+      <property name="vmargs" value="" />
+      <property name="args" value="" />
+      <java classname="${mainclass}"
+            fork="true"
+            failonerror="true">
+          <jvmarg value="-server" />
+          <jvmarg value="-ea" />
+          <jvmarg line="${vmargs}" />
+          <arg line="${args}" />
+          <classpath>
+              <path refid="cassandra.classpath" />
+              <pathelement location="${test.classes}"/>
+              <pathelement location="${test.conf}"/>
+              <fileset dir="${test.lib}">
+                  <include name="**/*.jar" />
+              </fileset>
+          </classpath>
+      </java>
+  </target>
+
   <!-- Generate IDEA project description files -->
   <target name="generate-idea-files" depends="build-test" 
description="Generate IDEA files">
     <mkdir dir=".idea"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e07d53aa/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
----------------------------------------------------------------------
diff --git 
a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java 
b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
index 17ac569..66abe5a 100644
--- a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
+++ b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
@@ -36,10 +36,36 @@ import org.apache.cassandra.utils.DynamicList;
 
 import static org.junit.Assert.*;
 
+/**
+ * Long BufferPool test - make sure that the BufferPool allocates and recycles
+ * ByteBuffers under heavy concurrent usage.
+ *
+ * The test creates two groups of threads
+ *
+ * - the burn producer/consumer pair that allocates 1/10 poolSize and then 
returns
+ *   all the memory to the pool. 50% is freed by the producer, 50% passed to 
the consumer thread.
+ *
+ * - a ring of worker threads that allocate buffers and either immediately 
free them,
+ *   or pass to the next worker thread for it to be freed on it's behalf.  
Periodically
+ *   all memory is freed by the thread.
+ *
+ * While the burn/worker threads run, the original main thread checks that all 
of the threads are still
+ * making progress every 10s (no locking issues, or exits from assertion 
failures),
+ * and that every chunk has been freed at least once during the previous cycle 
(if that was possible).
+ *
+ * The test does not expect to survive out-of-memory errors, so needs 
sufficient heap memory
+ * for non-direct buffers and the debug tracking objects that check the 
allocate buffers.
+ * (The timing is very interesting when Xmx is lowered to increase garbage 
collection pauses, but do
+ * not set it too low).
+ */
 public class LongBufferPoolTest
 {
     private static final Logger logger = 
LoggerFactory.getLogger(LongBufferPoolTest.class);
 
+    private static final int AVG_BUFFER_SIZE = 16 << 10;
+    private static final int STDEV_BUFFER_SIZE = 10 << 10; // picked to ensure 
exceeding buffer size is rare, but occurs
+    private static final DateFormat DATE_FORMAT = new 
SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+
     @Test
     public void testAllocate() throws InterruptedException, ExecutionException
     {
@@ -73,299 +99,393 @@ public class LongBufferPoolTest
         }
     }
 
-    public void testAllocate(int threadCount, long duration, int poolSize) 
throws InterruptedException, ExecutionException
+    private static final class TestEnvironment
     {
-        final int avgBufferSize = 16 << 10;
-        final int stdevBufferSize = 10 << 10; // picked to ensure exceeding 
buffer size is rare, but occurs
-        final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd 
HH:mm:ss");
+        final int threadCount;
+        final long duration;
+        final int poolSize;
+        final long until;
+        final CountDownLatch latch;
+        final SPSCQueue<BufferCheck>[] sharedRecycle;
+        final AtomicBoolean[] makingProgress;
+        final AtomicBoolean burnFreed;
+        final AtomicBoolean[] freedAllMemory;
+        final ExecutorService executorService;
+        final List<Future<Boolean>> threadResultFuture;
+        final int targetSizeQuanta;
+
+        TestEnvironment(int threadCount, long duration, int poolSize)
+        {
+            this.threadCount = threadCount;
+            this.duration = duration;
+            this.poolSize = poolSize;
+            until = System.nanoTime() + duration;
+            latch = new CountDownLatch(threadCount);
+            sharedRecycle = new SPSCQueue[threadCount];
+            makingProgress = new AtomicBoolean[threadCount];
+            burnFreed = new AtomicBoolean(false);
+            freedAllMemory = new AtomicBoolean[threadCount];
+            executorService = Executors.newFixedThreadPool(threadCount + 2);
+            threadResultFuture = new ArrayList<>(threadCount);
+
+            for (int i = 0; i < sharedRecycle.length; i++)
+            {
+                sharedRecycle[i] = new SPSCQueue<>();
+                makingProgress[i] = new AtomicBoolean(false);
+                freedAllMemory[i] = new AtomicBoolean(false);
+            }
 
-        System.out.println(String.format("%s - testing %d threads for %dm",
-                                         dateFormat.format(new Date()),
-                                         threadCount,
-                                         
TimeUnit.NANOSECONDS.toMinutes(duration)));
+            // Divide the poolSize across our threads, deliberately 
over-subscribing it.  Threads
+            // allocate a different amount of memory each - 1*quanta, 
2*quanta, ... N*quanta.
+            // Thread0 is always going to be a single CHUNK, then to allocate 
increasing amounts
+            // using their own algorithm the targetSize should be poolSize / 
targetSizeQuanta.
+            //
+            // This should divide double the poolSize across the working 
threads,
+            // plus CHUNK_SIZE for thread0 and 1/10 poolSize for the burn 
producer/consumer pair.
+            targetSizeQuanta = 2 * poolSize / sum1toN(threadCount - 1);
+        }
 
-        final long until = System.nanoTime() + duration;
-        final CountDownLatch latch = new CountDownLatch(threadCount);
-        final SPSCQueue<BufferCheck>[] sharedRecycle = new 
SPSCQueue[threadCount];
-        final AtomicBoolean[] makingProgress = new AtomicBoolean[threadCount];
-        for (int i = 0 ; i < sharedRecycle.length ; i++)
+        void addCheckedFuture(Future<Boolean> future)
         {
-            sharedRecycle[i] = new SPSCQueue<>();
-            makingProgress[i] = new AtomicBoolean(true);
+            threadResultFuture.add(future);
         }
 
-        ExecutorService executorService = 
Executors.newFixedThreadPool(threadCount + 2);
-        List<Future<Boolean>> ret = new ArrayList<>(threadCount);
-        long prevPoolSize = BufferPool.MEMORY_USAGE_THRESHOLD;
-        BufferPool.MEMORY_USAGE_THRESHOLD = poolSize;
-        BufferPool.DEBUG = true;
-        // sum(1..n) = n/2 * (n + 1); we set zero to CHUNK_SIZE, so have 
n=threadCount-1
-        int targetSizeQuanta = ((threadCount) * (threadCount - 1)) / 2;
-        // fix targetSizeQuanta at 1/64th our poolSize, so that we only 
consciously exceed our pool size limit
-        targetSizeQuanta = (targetSizeQuanta * poolSize) / 64;
-
+        int countStalledThreads()
         {
-            // setup some high churn allocate/deallocate, without any checking
-            final SPSCQueue<ByteBuffer> burn = new SPSCQueue<>();
-            final CountDownLatch doneAdd = new CountDownLatch(1);
-            executorService.submit(new TestUntil(until)
+            int stalledThreads = 0;
+
+            for (AtomicBoolean progress : makingProgress)
             {
-                int count = 0;
-                void testOne() throws Exception
-                {
-                    if (count * BufferPool.CHUNK_SIZE >= poolSize / 10)
-                    {
-                        if (burn.exhausted)
-                            count = 0;
-                        else
-                            Thread.yield();
-                        return;
-                    }
+                if (!progress.getAndSet(false))
+                    stalledThreads++;
+            }
+            return stalledThreads;
+        }
 
-                    ByteBuffer buffer = 
BufferPool.tryGet(BufferPool.CHUNK_SIZE);
-                    if (buffer == null)
-                    {
-                        Thread.yield();
-                        return;
-                    }
+        int countDoneThreads()
+        {
+            int doneThreads = 0;
+            for (Future<Boolean> r : threadResultFuture)
+            {
+                if (r.isDone())
+                    doneThreads++;
+            }
+            return doneThreads;
+        }
 
-                    BufferPool.put(buffer);
-                    burn.add(buffer);
-                    count++;
-                }
-                void cleanup()
-                {
-                    doneAdd.countDown();
-                }
-            });
-            executorService.submit(new TestUntil(until)
+        void assertCheckedThreadsSucceeded()
+        {
+            try
             {
-                void testOne() throws Exception
-                {
-                    ByteBuffer buffer = burn.poll();
-                    if (buffer == null)
-                    {
-                        Thread.yield();
-                        return;
-                    }
-                    BufferPool.put(buffer);
-                }
-                void cleanup()
-                {
-                    Uninterruptibles.awaitUninterruptibly(doneAdd);
-                }
-            });
+                for (Future<Boolean> r : threadResultFuture)
+                    assertTrue(r.get());
+            }
+            catch (InterruptedException ex)
+            {
+                // If interrupted while checking, restart and check everything.
+                assertCheckedThreadsSucceeded();
+            }
+            catch (ExecutionException ex)
+            {
+                fail("Checked thread threw exception: " + ex.toString());
+            }
         }
+    }
+
+    public void testAllocate(int threadCount, long duration, int poolSize) 
throws InterruptedException, ExecutionException
+    {
+        System.out.println(String.format("%s - testing %d threads for %dm",
+                                         DATE_FORMAT.format(new Date()),
+                                         threadCount,
+                                         
TimeUnit.NANOSECONDS.toMinutes(duration)));
+        long prevPoolSize = BufferPool.MEMORY_USAGE_THRESHOLD;
+        logger.info("Overriding configured 
BufferPool.MEMORY_USAGE_THRESHOLD={} and enabling BufferPool.DEBUG", poolSize);
+        BufferPool.MEMORY_USAGE_THRESHOLD = poolSize;
+        BufferPool.DEBUG = true;
+
+        TestEnvironment testEnv = new TestEnvironment(threadCount, duration, 
poolSize);
+
+        startBurnerThreads(testEnv);
 
-        for (int t = 0; t < threadCount; t++)
+        for (int threadIdx = 0; threadIdx < threadCount; threadIdx++)
+            testEnv.addCheckedFuture(startWorkerThread(testEnv, threadIdx));
+
+        while (!testEnv.latch.await(10L, TimeUnit.SECONDS))
         {
-            final int threadIdx = t;
-            final int targetSize = t == 0 ? BufferPool.CHUNK_SIZE : 
targetSizeQuanta * t;
+            int stalledThreads = testEnv.countStalledThreads();
+            int doneThreads = testEnv.countDoneThreads();
+
+            if (doneThreads == 0) // If any threads have completed, they will 
stop making progress/recycling buffers.
+            {                     // Assertions failures on the threads will 
be caught below.
+                assert stalledThreads == 0;
+                boolean allFreed = testEnv.burnFreed.getAndSet(false);
+                for (AtomicBoolean freedMemory : testEnv.freedAllMemory)
+                    allFreed = allFreed && freedMemory.getAndSet(false);
+                if (allFreed)
+                    BufferPool.assertAllRecycled();
+                else
+                    logger.info("All threads did not free all memory in this 
time slot - skipping buffer recycle check");
+            }
+        }
 
-            ret.add(executorService.submit(new TestUntil(until)
+        for (SPSCQueue<BufferCheck> queue : testEnv.sharedRecycle)
+        {
+            BufferCheck check;
+            while ( null != (check = queue.poll()) )
             {
-                final SPSCQueue<BufferCheck> shareFrom = 
sharedRecycle[threadIdx];
-                final DynamicList<BufferCheck> checks = new 
DynamicList<>((int) Math.max(1, targetSize / (1 << 10)));
-                final SPSCQueue<BufferCheck> shareTo = 
sharedRecycle[(threadIdx + 1) % threadCount];
-                final ThreadLocalRandom rand = ThreadLocalRandom.current();
-                int totalSize = 0;
-                int freeingSize = 0;
-                int size = 0;
-
-                void checkpoint()
-                {
-                    if (!makingProgress[threadIdx].get())
-                        makingProgress[threadIdx].set(true);
-                }
+                check.validate();
+                BufferPool.put(check.buffer);
+            }
+        }
 
-                void testOne() throws Exception
-                {
+        assertEquals(0, testEnv.executorService.shutdownNow().size());
 
-                    long currentTargetSize = rand.nextInt(poolSize / 1024) == 
0 ? 0 : targetSize;
-                    int spinCount = 0;
-                    while (totalSize > currentTargetSize - freeingSize)
-                    {
-                        // free buffers until we're below our target size
-                        if (checks.size() == 0)
-                        {
-                            // if we're out of buffers to free, we're waiting 
on our neighbour to free them;
-                            // first check if the consuming neighbour has 
caught up, and if so mark that free
-                            if (shareTo.exhausted)
-                            {
-                                totalSize -= freeingSize;
-                                freeingSize = 0;
-                            }
-                            else if (!recycleFromNeighbour())
-                            {
-                                if (++spinCount > 1000 && System.nanoTime() > 
until)
-                                    return;
-                                // otherwise, free one of our other 
neighbour's buffers if can; and otherwise yield
-                                Thread.yield();
-                            }
-                            continue;
-                        }
+        logger.info("Reverting BufferPool.MEMORY_USAGE_THRESHOLD={}", 
prevPoolSize);
+        BufferPool.MEMORY_USAGE_THRESHOLD = prevPoolSize;
+        BufferPool.DEBUG = false;
 
-                        // pick a random buffer, with preference going to 
earlier ones
-                        BufferCheck check = sample();
-                        checks.remove(check.listnode);
-                        check.validate();
+        testEnv.assertCheckedThreadsSucceeded();
 
-                        size = 
BufferPool.roundUpNormal(check.buffer.capacity());
-                        if (size > BufferPool.CHUNK_SIZE)
-                            size = 0;
+        System.out.println(String.format("%s - finished.",
+                                         DATE_FORMAT.format(new Date())));
+    }
 
-                        // either share to free, or free immediately
-                        if (rand.nextBoolean())
+    private Future<Boolean> startWorkerThread(TestEnvironment testEnv, final 
int threadIdx)
+    {
+        return testEnv.executorService.submit(new TestUntil(testEnv.until)
+        {
+            final int targetSize = threadIdx == 0 ? BufferPool.CHUNK_SIZE : 
testEnv.targetSizeQuanta * threadIdx;
+            final SPSCQueue<BufferCheck> shareFrom = 
testEnv.sharedRecycle[threadIdx];
+            final DynamicList<BufferCheck> checks = new DynamicList<>((int) 
Math.max(1, targetSize / (1 << 10)));
+            final SPSCQueue<BufferCheck> shareTo = 
testEnv.sharedRecycle[(threadIdx + 1) % testEnv.threadCount];
+            final ThreadLocalRandom rand = ThreadLocalRandom.current();
+            int totalSize = 0;
+            int freeingSize = 0;
+            int size = 0;
+
+            void checkpoint()
+            {
+                if (!testEnv.makingProgress[threadIdx].get())
+                    testEnv.makingProgress[threadIdx].set(true);
+            }
+
+            void testOne() throws Exception
+            {
+
+                long currentTargetSize = (rand.nextInt(testEnv.poolSize / 
1024) == 0 || !testEnv.freedAllMemory[threadIdx].get()) ? 0 : targetSize;
+                int spinCount = 0;
+                while (totalSize > currentTargetSize - freeingSize)
+                {
+                    // free buffers until we're below our target size
+                    if (checks.size() == 0)
+                    {
+                        // if we're out of buffers to free, we're waiting on 
our neighbour to free them;
+                        // first check if the consuming neighbour has caught 
up, and if so mark that free
+                        if (shareTo.exhausted)
                         {
-                            shareTo.add(check);
-                            freeingSize += size;
-                            // interleave this with potentially messing with 
the other neighbour's stuff
-                            recycleFromNeighbour();
+                            totalSize -= freeingSize;
+                            freeingSize = 0;
                         }
-                        else
+                        else if (!recycleFromNeighbour())
                         {
-                            check.validate();
-                            BufferPool.put(check.buffer);
-                            totalSize -= size;
+                            if (++spinCount > 1000 && System.nanoTime() > 
until)
+                                return;
+                            // otherwise, free one of our other neighbour's 
buffers if can; and otherwise yield
+                            Thread.yield();
                         }
+                        continue;
                     }
 
-                    // allocate a new buffer
-                    size = (int) Math.max(1, avgBufferSize + (stdevBufferSize 
* rand.nextGaussian()));
-                    if (size <= BufferPool.CHUNK_SIZE)
-                    {
-                        totalSize += BufferPool.roundUpNormal(size);
-                        allocate(size);
-                    }
-                    else if (rand.nextBoolean())
+                    // pick a random buffer, with preference going to earlier 
ones
+                    BufferCheck check = sample();
+                    checks.remove(check.listnode);
+                    check.validate();
+
+                    size = BufferPool.roundUpNormal(check.buffer.capacity());
+                    if (size > BufferPool.CHUNK_SIZE)
+                        size = 0;
+
+                    // either share to free, or free immediately
+                    if (rand.nextBoolean())
                     {
-                        allocate(size);
+                        shareTo.add(check);
+                        freeingSize += size;
+                        // interleave this with potentially messing with the 
other neighbour's stuff
+                        recycleFromNeighbour();
                     }
                     else
                     {
-                        // perform a burst allocation to exhaust all available 
memory
-                        while (totalSize < poolSize)
-                        {
-                            size = (int) Math.max(1, avgBufferSize + 
(stdevBufferSize * rand.nextGaussian()));
-                            if (size <= BufferPool.CHUNK_SIZE)
-                            {
-                                allocate(size);
-                                totalSize += BufferPool.roundUpNormal(size);
-                            }
-                        }
+                        check.validate();
+                        BufferPool.put(check.buffer);
+                        totalSize -= size;
                     }
+                }
 
-                    // validate a random buffer we have stashed
-                    checks.get(rand.nextInt(checks.size())).validate();
+                if (currentTargetSize == 0)
+                    testEnv.freedAllMemory[threadIdx].compareAndSet(false, 
true);
 
-                    // free all of our neighbour's remaining shared buffers
-                    while (recycleFromNeighbour());
+                // allocate a new buffer
+                size = (int) Math.max(1, AVG_BUFFER_SIZE + (STDEV_BUFFER_SIZE 
* rand.nextGaussian()));
+                if (size <= BufferPool.CHUNK_SIZE)
+                {
+                    totalSize += BufferPool.roundUpNormal(size);
+                    allocate(size);
                 }
-
-                void cleanup()
+                else if (rand.nextBoolean())
+                {
+                    allocate(size);
+                }
+                else
                 {
-                    while (checks.size() > 0)
+                    // perform a burst allocation to exhaust all available 
memory
+                    while (totalSize < testEnv.poolSize)
                     {
-                        BufferCheck check = checks.get(0);
-                        BufferPool.put(check.buffer);
-                        checks.remove(check.listnode);
+                        size = (int) Math.max(1, AVG_BUFFER_SIZE + 
(STDEV_BUFFER_SIZE * rand.nextGaussian()));
+                        if (size <= BufferPool.CHUNK_SIZE)
+                        {
+                            allocate(size);
+                            totalSize += BufferPool.roundUpNormal(size);
+                        }
                     }
-                    latch.countDown();
                 }
 
-                boolean recycleFromNeighbour()
+                // validate a random buffer we have stashed
+                checks.get(rand.nextInt(checks.size())).validate();
+
+                // free all of our neighbour's remaining shared buffers
+                while (recycleFromNeighbour());
+            }
+
+            void cleanup()
+            {
+                while (checks.size() > 0)
                 {
-                    BufferCheck check = shareFrom.poll();
-                    if (check == null)
-                        return false;
-                    check.validate();
+                    BufferCheck check = checks.get(0);
                     BufferPool.put(check.buffer);
-                    return true;
+                    checks.remove(check.listnode);
                 }
+                testEnv.latch.countDown();
+            }
 
-                BufferCheck allocate(int size)
-                {
-                    ByteBuffer buffer = BufferPool.get(size);
-                    assertNotNull(buffer);
-                    BufferCheck check = new BufferCheck(buffer, 
rand.nextLong());
-                    assertEquals(size, buffer.capacity());
-                    assertEquals(0, buffer.position());
-                    check.init();
-                    check.listnode = checks.append(check);
-                    return check;
-                }
+            boolean recycleFromNeighbour()
+            {
+                BufferCheck check = shareFrom.poll();
+                if (check == null)
+                    return false;
+                check.validate();
+                BufferPool.put(check.buffer);
+                return true;
+            }
 
-                BufferCheck sample()
+            BufferCheck allocate(int size)
+            {
+                ByteBuffer buffer = BufferPool.get(size);
+                assertNotNull(buffer);
+                BufferCheck check = new BufferCheck(buffer, rand.nextLong());
+                assertEquals(size, buffer.capacity());
+                assertEquals(0, buffer.position());
+                check.init();
+                check.listnode = checks.append(check);
+                return check;
+            }
+
+            BufferCheck sample()
+            {
+                // sample with preference to first elements:
+                // element at index n will be selected with likelihood (size - 
n) / sum1ToN(size)
+                int size = checks.size();
+
+                // pick a random number between 1 and sum1toN(size)
+                int sampleRange = sum1toN(size);
+                int sampleIndex = rand.nextInt(sampleRange);
+
+                // then binary search for the N, such that [sum1ToN(N), 
sum1ToN(N+1)) contains this random number
+                int moveBy = Math.max(size / 4, 1);
+                int index = size / 2;
+                while (true)
                 {
-                    // sample with preference to first elements:
-                    // element at index n will be selected with likelihood 
(size - n) / sum1ToN(size)
-                    int size = checks.size();
-
-                    // pick a random number between 1 and sum1toN(size)
-                    int sampleRange = sum1toN(size);
-                    int sampleIndex = rand.nextInt(sampleRange);
-
-                    // then binary search for the N, such that [sum1ToN(N), 
sum1ToN(N+1)) contains this random number
-                    int moveBy = Math.max(size / 4, 1);
-                    int index = size / 2;
-                    while (true)
+                    int baseSampleIndex = sum1toN(index);
+                    int endOfSampleIndex = sum1toN(index + 1);
+                    if (sampleIndex >= baseSampleIndex)
                     {
-                        int baseSampleIndex = sum1toN(index);
-                        int endOfSampleIndex = sum1toN(index + 1);
-                        if (sampleIndex >= baseSampleIndex)
-                        {
-                            if (sampleIndex < endOfSampleIndex)
-                                break;
-                            index += moveBy;
-                        }
-                        else index -= moveBy;
-                        moveBy = Math.max(moveBy / 2, 1);
+                        if (sampleIndex < endOfSampleIndex)
+                            break;
+                        index += moveBy;
                     }
+                    else index -= moveBy;
+                    moveBy = Math.max(moveBy / 2, 1);
+                }
 
-                    // this gives us the inverse of our desired value, so just 
subtract it from the last index
-                    index = size - (index + 1);
+                // this gives us the inverse of our desired value, so just 
subtract it from the last index
+                index = size - (index + 1);
 
-                    return checks.get(index);
+                return checks.get(index);
+            }
+        });
+    }
+
+    private void startBurnerThreads(TestEnvironment testEnv)
+    {
+        // setup some high churn allocate/deallocate, without any checking
+        final SPSCQueue<ByteBuffer> burn = new SPSCQueue<>();
+        final CountDownLatch doneAdd = new CountDownLatch(1);
+        testEnv.addCheckedFuture(testEnv.executorService.submit(new 
TestUntil(testEnv.until)
+        {
+            int count = 0;
+            final ThreadLocalRandom rand = ThreadLocalRandom.current();
+            void testOne() throws Exception
+            {
+                if (count * BufferPool.CHUNK_SIZE >= testEnv.poolSize / 10)
+                {
+                    if (burn.exhausted)
+                    {
+                        count = 0;
+                        testEnv.burnFreed.compareAndSet(false, true);
+                    } else
+                    {
+                        Thread.yield();
+                    }
+                    return;
                 }
 
-                private int sum1toN(int n)
+                ByteBuffer buffer = BufferPool.tryGet(BufferPool.CHUNK_SIZE);
+                if (buffer == null)
                 {
-                    return (n * (n + 1)) / 2;
+                    Thread.yield();
+                    return;
                 }
-            }));
-        }
 
-        boolean first = true;
-        while (!latch.await(10L, TimeUnit.SECONDS))
-        {
-            if (!first)
-                BufferPool.assertAllRecycled();
-            first = false;
-            for (AtomicBoolean progress : makingProgress)
+                // 50/50 chance of returning the buffer from the producer 
thread, or
+                // pass it on to the consumer.
+                if (rand.nextBoolean())
+                    BufferPool.put(buffer);
+                else
+                    burn.add(buffer);
+
+                count++;
+            }
+            void cleanup()
             {
-                assert progress.get();
-                progress.set(false);
+                doneAdd.countDown();
             }
-        }
-
-        for (SPSCQueue<BufferCheck> queue : sharedRecycle)
+        }));
+        testEnv.threadResultFuture.add(testEnv.executorService.submit(new 
TestUntil(testEnv.until)
         {
-            BufferCheck check;
-            while ( null != (check = queue.poll()) )
+            void testOne() throws Exception
             {
-                check.validate();
-                BufferPool.put(check.buffer);
+                ByteBuffer buffer = burn.poll();
+                if (buffer == null)
+                {
+                    Thread.yield();
+                    return;
+                }
+                BufferPool.put(buffer);
             }
-        }
-
-        assertEquals(0, executorService.shutdownNow().size());
-
-        BufferPool.MEMORY_USAGE_THRESHOLD = prevPoolSize;
-        for (Future<Boolean> r : ret)
-            assertTrue(r.get());
-
-        System.out.println(String.format("%s - finished.",
-                                         dateFormat.format(new Date())));
+            void cleanup()
+            {
+                Uninterruptibles.awaitUninterruptibly(doneAdd);
+            }
+        }));
     }
 
     static abstract class TestUntil implements Callable<Boolean>
@@ -399,6 +519,14 @@ public class LongBufferPoolTest
                 ex.printStackTrace();
                 return false;
             }
+            catch (Throwable tr) // for java.lang.OutOfMemoryError
+            {
+                logger.error("Got throwable {}, current chunk {}",
+                             tr.getMessage(),
+                             BufferPool.currentChunk());
+                tr.printStackTrace();
+                return false;
+            }
             finally
             {
                 cleanup();
@@ -407,9 +535,19 @@ public class LongBufferPoolTest
         }
     }
 
-    public static void main(String[] args) throws InterruptedException, 
ExecutionException
+    public static void main(String[] args)
     {
-        new 
LongBufferPoolTest().testAllocate(Runtime.getRuntime().availableProcessors(), 
TimeUnit.HOURS.toNanos(2L), 16 << 20);
+        try
+        {
+            new 
LongBufferPoolTest().testAllocate(Runtime.getRuntime().availableProcessors(),
+                                                  TimeUnit.HOURS.toNanos(2L), 
16 << 20);
+            System.exit(0);
+        }
+        catch (Throwable tr)
+        {
+            System.out.println(String.format("Test failed - %s", 
tr.getMessage()));
+            System.exit(1); // Force exit so that non-daemon threads like 
REQUEST-SCHEDULER do not hang the process on failure
+        }
     }
 
     /**
@@ -451,4 +589,8 @@ public class LongBufferPoolTest
         }
     }
 
-}
\ No newline at end of file
+    private static int sum1toN(int n)
+    {
+        return (n * (n + 1)) / 2;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to