Cleanup test that recursed infinitely due to failure in precheckin

Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/cd591f7f
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/cd591f7f
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/cd591f7f

Branch: refs/heads/feature/GEODE-2632-17
Commit: cd591f7f4155d498e92755c14654d71965e26a74
Parents: 30ba91e
Author: Kirk Lund <kl...@apache.org>
Authored: Wed May 24 16:40:29 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed May 24 16:40:29 2017 -0700

----------------------------------------------------------------------
 .../cache/ha/BlockingHARegionJUnitTest.java     | 488 +++++++++----------
 .../SerializableErrorCollector.java             |  24 +
 2 files changed, 242 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/cd591f7f/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
index d0f5793..3c1adc3 100755
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
@@ -14,76 +14,114 @@
  */
 package org.apache.geode.internal.cache.ha;
 
+import static java.util.concurrent.TimeUnit.*;
 import static org.apache.geode.distributed.ConfigurationProperties.*;
+import static org.apache.geode.internal.cache.ha.HARegionQueue.*;
 import static org.junit.Assert.*;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.test.dunit.ThreadUtils;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
 import org.apache.geode.test.junit.categories.IntegrationTest;
+import 
org.apache.geode.test.junit.rules.serializable.SerializableErrorCollector;
 
+/**
+ * Integration tests for Blocking HARegionQueue.
+ *
+ * <p>
+ * #40314: Filled up queue causes all publishers to block
+ *
+ * <p>
+ * #37627: In case of out of order messages, (sequence Id violation), in spite 
of HARQ not full, the capacity (putPermits) of the HARQ exhausted.
+ */
 @Category({IntegrationTest.class, ClientSubscriptionTest.class})
 public class BlockingHARegionJUnitTest {
 
-  private static InternalCache cache = null;
+  public static final String REGION = "BlockingHARegionJUnitTest_Region";
+  private static final long THREAD_TIMEOUT = 2 * 60 * 1000;
+
+  private final Object numberForThreadsLock = new Object();
+  private int numberForDoPuts;
+  private int numberForDoTakes;
+
+  volatile boolean stopThreads;
 
-  /** boolean to record an exception occurence in another thread **/
-  private static volatile boolean exceptionOccurred = false;
-  /** StringBuffer to store the exception **/
-  private static StringBuffer exceptionString = new StringBuffer();
-  /** boolen to quit the for loop **/
-  private static volatile boolean quitForLoop = false;
+  private InternalCache cache;
+  private HARegionQueueAttributes queueAttributes;
+  private List<Thread> threads;
+  private ThreadGroup threadGroup;
+
+  @Rule
+  public SerializableErrorCollector errorCollector = new 
SerializableErrorCollector();
 
   @Before
   public void setUp() throws Exception {
-    Properties props = new Properties();
-    props.setProperty(MCAST_PORT, "0");
-    if (cache != null) {
-      cache.close(); // fault tolerance
+    synchronized (this.numberForThreadsLock) {
+      this.numberForDoPuts = 0;
+      this.numberForDoTakes = 0;
+    }
+
+    this.stopThreads = false;
+    this.threads = new ArrayList<>();
+    this.threadGroup = new ThreadGroup(getClass().getSimpleName()) {
+      @Override
+      public void uncaughtException(Thread t, Throwable e) {
+        errorCollector.addError(e);
+      }
+    };
+
+    this.queueAttributes = new HARegionQueueAttributes();
+
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+
+    this.cache = (InternalCache) 
CacheFactory.create(DistributedSystem.connect(config));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      this.stopThreads = true;
+      for (Thread thread : this.threads) {
+        thread.interrupt();
+        ThreadUtils.join(thread, THREAD_TIMEOUT);
+      }
+    } finally {
+      if (this.cache != null) {
+        this.cache.close();
+      }
     }
-    cache = (InternalCache) 
CacheFactory.create(DistributedSystem.connect(props));
   }
 
   /**
-   * This test has a scenario where the HAReqionQueue capacity is just 1. 
There will be two thread.
+   * This test has a scenario where the HARegionQueue capacity is just 1. 
There will be two thread.
    * One doing a 1000 puts and the other doing a 1000 takes. The validation 
for this test is that it
    * should not encounter any exceptions
    */
   @Test
   public void testBoundedPuts() throws Exception {
-    exceptionOccurred = false;
-    HARegionQueueAttributes harqa = new HARegionQueueAttributes();
-    harqa.setBlockingQueueCapacity(1);
-    HARegionQueue hrq = 
HARegionQueue.getHARegionQueueInstance("BlockingHARegionJUnitTest_Region",
-        cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false);
-    hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for 
primary only.
-    Thread thread1 = new DoPuts(hrq, 1000);
-    Thread thread2 = new DoTake(hrq, 1000);
-
-    thread1.start();
-    thread2.start();
-
-    ThreadUtils.join(thread1, 30 * 1000);
-    ThreadUtils.join(thread2, 30 * 1000);
-
-    if (exceptionOccurred) {
-      fail(" Test failed due to " + exceptionString);
-    }
+    this.queueAttributes.setBlockingQueueCapacity(1);
+    HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, 
this.queueAttributes, BLOCKING_HA_QUEUE, false);
+    hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked 
for primary only
 
-    cache.close();
+    startDoPuts(hrq, 1000);
+    startDoTakes(hrq, 1000);
   }
 
   /**
@@ -96,62 +134,22 @@ public class BlockingHARegionJUnitTest {
    */
   @Test
   public void testPutBeingBlocked() throws Exception {
-    exceptionOccurred = false;
-    quitForLoop = false;
-    HARegionQueueAttributes harqa = new HARegionQueueAttributes();
-    harqa.setBlockingQueueCapacity(1);
-    final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
-        "BlockingHARegionJUnitTest_Region", cache, harqa, 
HARegionQueue.BLOCKING_HA_QUEUE, false);
-    hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for 
primary only.
-    final Thread thread1 = new DoPuts(hrq, 2);
-    thread1.start();
-    WaitCriterion ev = new WaitCriterion() {
-      public boolean done() {
-        return hrq.region.size() == 2;
-      }
+    this.queueAttributes.setBlockingQueueCapacity(1);
+    HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, 
this.queueAttributes, BLOCKING_HA_QUEUE, false);
+    hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked 
for primary only
 
-      public String description() {
-        return null;
-      }
-    };
-    Wait.waitForCriterion(ev, 1000, 200, true);
-    assertTrue(thread1.isAlive()); // thread should still be alive (in wait 
state)
-
-    Thread thread2 = new DoTake(hrq, 1);
-    thread2.start(); // start take thread
-    ev = new WaitCriterion() {
-      public boolean done() {
-        return hrq.region.size() == 3;
-      }
+    Thread doPuts = startDoPuts(hrq, 2);
 
-      public String description() {
-        return null;
-      }
-    };
-    // sleep. take will proceed and so will sleeping put
-    Wait.waitForCriterion(ev, 3 * 1000, 200, true);
+    await().until(() -> assertTrue(hrq.region.size() == 2));
 
-    // thread should have died since put should have proceeded
-    ev = new WaitCriterion() {
-      public boolean done() {
-        return !thread1.isAlive();
-      }
+    // thread should still be alive (in wait state)
+    assertTrue(doPuts.isAlive());
 
-      public String description() {
-        return "thread1 still alive";
-      }
-    };
-    Wait.waitForCriterion(ev, 30 * 1000, 1000, true);
+    startDoTakes(hrq, 1);
 
-    ThreadUtils.join(thread1, 30 * 1000); // for completeness
-    ThreadUtils.join(thread2, 30 * 1000);
-    if (exceptionOccurred) {
-      fail(" Test failed due to " + exceptionString);
-    }
-    cache.close();
+    await().until(() -> assertTrue(hrq.region.size() == 3));
   }
 
-
   /**
    * This test tests that the region capacity is never exceeded even in highly 
concurrent
    * environments. The region capacity is set to 10000. Then 5 threads start 
doing put
@@ -161,62 +159,25 @@ public class BlockingHARegionJUnitTest {
    */
   @Test
   public void testConcurrentPutsNotExceedingLimit() throws Exception {
-    exceptionOccurred = false;
-    quitForLoop = false;
-    HARegionQueueAttributes harqa = new HARegionQueueAttributes();
-    harqa.setBlockingQueueCapacity(10000);
-    final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
-        "BlockingHARegionJUnitTest_Region", cache, harqa, 
HARegionQueue.BLOCKING_HA_QUEUE, false);
-    hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for 
primary only.
-    Thread thread1 = new DoPuts(hrq, 20000, 1);
-    Thread thread2 = new DoPuts(hrq, 20000, 2);
-    Thread thread3 = new DoPuts(hrq, 20000, 3);
-    Thread thread4 = new DoPuts(hrq, 20000, 4);
-    Thread thread5 = new DoPuts(hrq, 20000, 5);
-
-    thread1.start();
-    thread2.start();
-    thread3.start();
-    thread4.start();
-    thread5.start();
-
-    WaitCriterion ev = new WaitCriterion() {
-      public boolean done() {
-        return hrq.region.size() == 20000;
-      }
-
-      public String description() {
-        return null;
-      }
-    };
-    Wait.waitForCriterion(ev, 30 * 1000, 200, true);
-
-    assertTrue(thread1.isAlive());
-    assertTrue(thread2.isAlive());
-    assertTrue(thread3.isAlive());
-    assertTrue(thread4.isAlive());
-    assertTrue(thread5.isAlive());
-
-    assertTrue(hrq.region.size() == 20000);
+    this.queueAttributes.setBlockingQueueCapacity(10000);
+    HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, 
this.queueAttributes, BLOCKING_HA_QUEUE, false);
+    hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked 
for primary only
 
-    quitForLoop = true;
-    Thread.sleep(20000);
+    Thread doPuts1 = startDoPuts(hrq, 20000, 1);
+    Thread doPuts2 = startDoPuts(hrq, 20000, 2);
+    Thread doPuts3 = startDoPuts(hrq, 20000, 3);
+    Thread doPuts4 = startDoPuts(hrq, 20000, 4);
+    Thread doPuts5 = startDoPuts(hrq, 20000, 5);
 
-    thread1.interrupt();
-    thread2.interrupt();
-    thread3.interrupt();
-    thread4.interrupt();
-    thread5.interrupt();
+    await().until(() -> assertTrue(hrq.region.size() == 20000));
 
-    Thread.sleep(2000);
+    assertTrue(doPuts1.isAlive());
+    assertTrue(doPuts2.isAlive());
+    assertTrue(doPuts3.isAlive());
+    assertTrue(doPuts4.isAlive());
+    assertTrue(doPuts5.isAlive());
 
-    ThreadUtils.join(thread1, 5 * 60 * 1000);
-    ThreadUtils.join(thread2, 5 * 60 * 1000);
-    ThreadUtils.join(thread3, 5 * 60 * 1000);
-    ThreadUtils.join(thread4, 5 * 60 * 1000);
-    ThreadUtils.join(thread5, 5 * 60 * 1000);
-
-    cache.close();
+    assertTrue(hrq.region.size() == 20000);
   }
 
   /**
@@ -226,84 +187,40 @@ public class BlockingHARegionJUnitTest {
    * state. the region size would be verified to be 20000 (10000 puts and 
10000 DACE objects). then
    * the threads are interrupted and made to quit the loop
    */
-  @Ignore("TODO: test is disabled")
+  @Ignore("Test is disabled until/if blocking queue capacity becomes a hard 
limit")
   @Test
   public void testConcurrentPutsTakesNotExceedingLimit() throws Exception {
-    exceptionOccurred = false;
-    quitForLoop = false;
-    HARegionQueueAttributes harqa = new HARegionQueueAttributes();
-    harqa.setBlockingQueueCapacity(10000);
-    final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
-        "BlockingHARegionJUnitTest_Region", cache, harqa, 
HARegionQueue.BLOCKING_HA_QUEUE, false);
-    Thread thread1 = new DoPuts(hrq, 40000, 1);
-    Thread thread2 = new DoPuts(hrq, 40000, 2);
-    Thread thread3 = new DoPuts(hrq, 40000, 3);
-    Thread thread4 = new DoPuts(hrq, 40000, 4);
-    Thread thread5 = new DoPuts(hrq, 40000, 5);
-
-    Thread thread6 = new DoTake(hrq, 5000);
-    Thread thread7 = new DoTake(hrq, 5000);
-    Thread thread8 = new DoTake(hrq, 5000);
-    Thread thread9 = new DoTake(hrq, 5000);
-    Thread thread10 = new DoTake(hrq, 5000);
-
-    thread1.start();
-    thread2.start();
-    thread3.start();
-    thread4.start();
-    thread5.start();
-
-    thread6.start();
-    thread7.start();
-    thread8.start();
-    thread9.start();
-    thread10.start();
-
-    ThreadUtils.join(thread6, 30 * 1000);
-    ThreadUtils.join(thread7, 30 * 1000);
-    ThreadUtils.join(thread8, 30 * 1000);
-    ThreadUtils.join(thread9, 30 * 1000);
-    ThreadUtils.join(thread10, 30 * 1000);
-
-    WaitCriterion ev = new WaitCriterion() {
-      public boolean done() {
-        return hrq.region.size() == 20000;
-      }
-
-      public String description() {
-        return null;
-      }
-    };
-    Wait.waitForCriterion(ev, 30 * 1000, 200, true);
-
-    assertTrue(thread1.isAlive());
-    assertTrue(thread2.isAlive());
-    assertTrue(thread3.isAlive());
-    assertTrue(thread4.isAlive());
-    assertTrue(thread5.isAlive());
+    this.queueAttributes.setBlockingQueueCapacity(10000);
+    HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, 
this.queueAttributes, BLOCKING_HA_QUEUE, false);
+    hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked 
for primary only
+
+    Thread doPuts1 = startDoPuts(hrq, 40000, 1);
+    Thread doPuts2 = startDoPuts(hrq, 40000, 2);
+    Thread doPuts3 = startDoPuts(hrq, 40000, 3);
+    Thread doPuts4 = startDoPuts(hrq, 40000, 4);
+    Thread doPuts5 = startDoPuts(hrq, 40000, 5);
+
+    Thread doTakes1 = startDoTakes(hrq, 5000);
+    Thread doTakes2 = startDoTakes(hrq, 5000);
+    Thread doTakes3 = startDoTakes(hrq, 5000);
+    Thread doTakes4 = startDoTakes(hrq, 5000);
+    Thread doTakes5 = startDoTakes(hrq, 5000);
+
+    ThreadUtils.join(doTakes1, 30 * 1000);
+    ThreadUtils.join(doTakes2, 30 * 1000);
+    ThreadUtils.join(doTakes3, 30 * 1000);
+    ThreadUtils.join(doTakes4, 30 * 1000);
+    ThreadUtils.join(doTakes5, 30 * 1000);
+
+    await().until(() -> assertTrue(hrq.region.size() == 20000));
+
+    assertTrue(doPuts1.isAlive());
+    assertTrue(doPuts2.isAlive());
+    assertTrue(doPuts3.isAlive());
+    assertTrue(doPuts4.isAlive());
+    assertTrue(doPuts5.isAlive());
 
     assertTrue(hrq.region.size() == 20000);
-
-    quitForLoop = true;
-
-    Thread.sleep(2000);
-
-    thread1.interrupt();
-    thread2.interrupt();
-    thread3.interrupt();
-    thread4.interrupt();
-    thread5.interrupt();
-
-    Thread.sleep(2000);
-
-
-    ThreadUtils.join(thread1, 30 * 1000);
-    ThreadUtils.join(thread2, 30 * 1000);
-    ThreadUtils.join(thread3, 30 * 1000);
-    ThreadUtils.join(thread4, 30 * 1000);
-    ThreadUtils.join(thread5, 30 * 1000);
-
-    cache.close();
   }
 
   /**
@@ -315,62 +232,91 @@ public class BlockingHARegionJUnitTest {
    */
   @Test
   public void testHARQMaxCapacity_Bug37627() throws Exception {
-    try {
-      exceptionOccurred = false;
-      quitForLoop = false;
-      HARegionQueueAttributes harqa = new HARegionQueueAttributes();
-      harqa.setBlockingQueueCapacity(1);
-      harqa.setExpiryTime(180);
-      final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
-          "BlockingHARegionJUnitTest_Region", cache, harqa, 
HARegionQueue.BLOCKING_HA_QUEUE, false);
-      hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked 
for primary only.
-      final EventID id1 = new EventID(new byte[] {1}, 1, 2); // violation
-      final EventID ignore = new EventID(new byte[] {1}, 1, 1); //
-      final EventID id2 = new EventID(new byte[] {1}, 1, 3); //
-      Thread t1 = new Thread() {
-        public void run() {
-          try {
-            hrq.put(new ConflatableObject("key1", "value1", id1, false, 
"region1"));
-            hrq.take();
-            hrq.put(new ConflatableObject("key2", "value1", ignore, false, 
"region1"));
-            hrq.put(new ConflatableObject("key3", "value1", id2, false, 
"region1"));
-          } catch (Exception e) {
-            exceptionString.append("First Put in region queue failed");
-            exceptionOccurred = true;
-          }
+    this.queueAttributes.setBlockingQueueCapacity(1);
+    this.queueAttributes.setExpiryTime(180);
+    HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, 
this.queueAttributes, BLOCKING_HA_QUEUE, false);
+    hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked 
for primary only
+
+    EventID event1 = new EventID(new byte[] {1}, 1, 2); // violation
+    EventID event2 = new EventID(new byte[] {1}, 1, 1); // ignored
+    EventID event3 = new EventID(new byte[] {1}, 1, 3);
+
+    newThread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          hrq.put(new ConflatableObject("key1", "value1", event1, false, 
"region1"));
+          hrq.take();
+          hrq.put(new ConflatableObject("key2", "value1", event2, false, 
"region1"));
+          hrq.put(new ConflatableObject("key3", "value1", event3, false, 
"region1"));
+        } catch (Exception e) {
+          errorCollector.addError(e);
         }
-      };
-      t1.start();
-      ThreadUtils.join(t1, 20 * 1000);
-      if (exceptionOccurred) {
-        fail(" Test failed due to " + exceptionString);
-      }
-    } finally {
-      if (cache != null) {
-        cache.close();
       }
+    });
+  }
+
+  private Thread newThread(Runnable runnable) {
+    Thread thread = new Thread(this.threadGroup, runnable);
+    this.threads.add(thread);
+    thread.start();
+    return thread;
+  }
+
+  private Thread startDoPuts(HARegionQueue haRegionQueue, int count) {
+    return startDoPuts(haRegionQueue, count, 0);
+  }
+
+  private Thread startDoPuts(HARegionQueue haRegionQueue, int count, int 
regionId) {
+    Thread thread = new DoPuts(this.threadGroup, haRegionQueue, count, 
regionId);
+    this.threads.add(thread);
+    thread.start();
+    return thread;
+  }
+
+  private Thread startDoTakes(HARegionQueue haRegionQueue, int count) {
+    Thread thread = new DoTakes(this.threadGroup, haRegionQueue, count);
+    this.threads.add(thread);
+    thread.start();
+    return thread;
+  }
+
+  private ConditionFactory await() {
+    return Awaitility.await().atMost(2, MINUTES);
+  }
+
+  int nextDoPutsThreadNum() {
+    synchronized (this.numberForThreadsLock) {
+      return numberForDoPuts++;
+    }
+  }
+
+  int nextDoTakesThreadNum() {
+    synchronized (this.numberForThreadsLock) {
+      return numberForDoTakes++;
     }
   }
 
   /**
    * class which does specified number of puts on the queue
    */
-  private static class DoPuts extends Thread {
+  private class DoPuts extends Thread {
 
-    HARegionQueue regionQueue = null;
-    final int numberOfPuts;
+    private final HARegionQueue regionQueue;
 
-    DoPuts(HARegionQueue haRegionQueue, int numberOfPuts) {
-      this.regionQueue = haRegionQueue;
-      this.numberOfPuts = numberOfPuts;
-    }
+    private final int numberOfPuts;
 
     /**
      * region id can be specified to generate Thread unique events
      */
-    int regionId = 0;
+    private final int regionId;
 
-    DoPuts(HARegionQueue haRegionQueue, int numberOfPuts, int regionId) {
+    DoPuts(ThreadGroup threadGroup, HARegionQueue haRegionQueue, int 
numberOfPuts) {
+      this(threadGroup, haRegionQueue, numberOfPuts, 0);
+    }
+
+    DoPuts(ThreadGroup threadGroup, HARegionQueue haRegionQueue, int 
numberOfPuts, int regionId) {
+      super(threadGroup, "DoPuts-" + nextDoPutsThreadNum());
       this.regionQueue = haRegionQueue;
       this.numberOfPuts = numberOfPuts;
       this.regionId = regionId;
@@ -378,19 +324,16 @@ public class BlockingHARegionJUnitTest {
 
     @Override
     public void run() {
-      for (int i = 0; i < numberOfPuts; i++) {
+      for (int i = 0; i < this.numberOfPuts; i++) {
+        if (stopThreads || Thread.currentThread().isInterrupted()) {
+          break;
+        }
         try {
           this.regionQueue.put(new ConflatableObject("" + i, "" + i,
-              new EventID(new byte[regionId], i, i), false, 
"BlockingHARegionJUnitTest_Region"));
-          if (quitForLoop) {
-            break;
-          }
-          if (Thread.currentThread().isInterrupted()) {
-            break;
-          }
+              new EventID(new byte[this.regionId], i, i), false, REGION));
         } catch (Exception e) {
-          exceptionOccurred = true;
-          exceptionString.append(" Exception occurred due to " + e);
+          errorCollector.addError(e);
+          break;
         }
       }
     }
@@ -399,24 +342,29 @@ public class BlockingHARegionJUnitTest {
   /**
    * class which does a specified number of takes
    */
-  private static class DoTake extends Thread {
+  private class DoTakes extends Thread {
 
-    final HARegionQueue regionQueue;
-    final int numberOfTakes;
+    private final HARegionQueue regionQueue;
 
-    DoTake(HARegionQueue haRegionQueue, int numberOfTakes) {
+    private final int numberOfTakes;
+
+    DoTakes(ThreadGroup threadGroup, HARegionQueue haRegionQueue, int 
numberOfTakes) {
+      super(threadGroup, "DoTakes-" + nextDoTakesThreadNum());
       this.regionQueue = haRegionQueue;
       this.numberOfTakes = numberOfTakes;
     }
 
     @Override
     public void run() {
-      for (int i = 0; i < numberOfTakes; i++) {
+      for (int i = 0; i < this.numberOfTakes; i++) {
+        if (stopThreads || Thread.currentThread().isInterrupted()) {
+          break;
+        }
         try {
           assertNotNull(this.regionQueue.take());
         } catch (Exception e) {
-          exceptionOccurred = true;
-          exceptionString.append(" Exception occurred due to " + e);
+          errorCollector.addError(e);
+          break;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/cd591f7f/geode-junit/src/main/java/org/apache/geode/test/junit/rules/serializable/SerializableErrorCollector.java
----------------------------------------------------------------------
diff --git 
a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/serializable/SerializableErrorCollector.java
 
b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/serializable/SerializableErrorCollector.java
new file mode 100644
index 0000000..0abfdaf
--- /dev/null
+++ 
b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/serializable/SerializableErrorCollector.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.test.junit.rules.serializable;
+
+import org.junit.rules.ErrorCollector;
+
+import java.io.Serializable;
+
+public class SerializableErrorCollector extends ErrorCollector implements 
Serializable {
+}

Reply via email to