Repository: geode Updated Branches: refs/heads/develop 462ebb032 -> a76aaf0c0
Removing unused Collaboration lock class and its unit tests This class isn't used in Geode and isn't a public API. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/a76aaf0c Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/a76aaf0c Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/a76aaf0c Branch: refs/heads/develop Commit: a76aaf0c02e2c4a32d7c001073f1ecc316470b60 Parents: 462ebb0 Author: Bruce Schuchardt <bschucha...@pivotal.io> Authored: Tue Dec 27 14:57:21 2016 -0800 Committer: Bruce Schuchardt <bschucha...@pivotal.io> Committed: Tue Dec 27 14:57:21 2016 -0800 ---------------------------------------------------------------------- .../internal/locks/Collaboration.java | 454 -------------- .../internal/locks/CollaborationJUnitTest.java | 615 ------------------- 2 files changed, 1069 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/a76aaf0c/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/Collaboration.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/Collaboration.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/Collaboration.java deleted file mode 100644 index ca9ef70..0000000 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/Collaboration.java +++ /dev/null @@ -1,454 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.geode.distributed.internal.locks; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.logging.log4j.Logger; - -import org.apache.geode.CancelCriterion; -import org.apache.geode.internal.Assert; -import org.apache.geode.internal.i18n.LocalizedStrings; -import org.apache.geode.internal.logging.LogService; - -/** - * Synchronization structure which allows multiple threads to lock the structure. Implementation is - * fair: the next waiting thread will be serviced. - * <p> - * Collaborating threads may jointly synchronize on this structure if they all agree on the same - * topic of collaboration. - * <p> - * Threads that want to change the topic will wait until the current topic has been released. - * - */ -public class Collaboration { - - private static final Logger logger = LogService.getLogger(); - - private final static Object NULL_TOPIC = null; - - /** - * The current topic of collaboration - * - * guarded.By {@link #topicsQueue} - */ - private volatile Topic currentTopic; - - /** Ordered queue of pending topics for collaboration */ - private final List topicsQueue = new ArrayList(); - - /** Map of external topic to internal wrapper object (Topic) */ - private final Map topicsMap = new HashMap(); - - private final CancelCriterion stopper; - - /** - * Constructs new stoppable instance of Collaboration which will heed an interrupt request if it - * is acceptable to the creator of the lock. - */ - public Collaboration(CancelCriterion stopper) { - this.stopper = stopper; - } - - /** - * Acquire permission to participate in the collaboration. Returns immediately if topic matches - * the current topic. Otherwise, this will block until the Collaboration has been freed by the - * threads working on the current topic. This call is interruptible. - * - * @param topicObject Object to collaborate on - * - * @throws InterruptedException if thread is interrupted - */ - public void acquire(Object topicObject) throws InterruptedException { - throw new UnsupportedOperationException( - LocalizedStrings.Collaboration_NOT_IMPLEMENTED.toLocalizedString()); - } - - /** - * Must be synchronized on this.topicsQueue... Asserts that thread is not reentering. - */ - private void assertNotRecursingTopic(Object topicObject) { - Assert.assertTrue(false, Thread.currentThread() + " attempting to lock topic " + topicObject - + " while locking topic " + this.currentTopic); - } - - /** - * Acquire permission to participate in the collaboration. Returns immediately if topic matches - * the current topic. Otherwise, this will block until the Collaboration has been freed by the - * threads working on the current topic. This call is uninterruptible. - * - * @param topic Object to collaborate on - */ - public void acquireUninterruptibly(final Object topic) { - Object topicObject = topic; - if (topicObject == null) { - topicObject = NULL_TOPIC; - } - - Topic pendingTopic = null; - synchronized (this.topicsQueue) { - // if no topic then setup and return - if (this.currentTopic == null) { - if (logger.isDebugEnabled()) { - logger.debug( - "Collaboration.acquireUninterruptibly: {}: no current topic, setting topic to {}", - this.toString(), topicObject); - } - setCurrentTopic(new Topic(topicObject)); - this.currentTopic.addThread(Thread.currentThread()); - this.topicsMap.put(topicObject, this.currentTopic); - return; - } - - else if (isCurrentTopic(topicObject)) { - // assertNotRecursingTopic(topicObject); - if (logger.isDebugEnabled()) { - logger.debug("Collaboration.acquireUninterruptibly: {}: already current topic: {}", - this.toString(), topicObject); - } - this.currentTopic.addThread(Thread.currentThread()); - return; - } - - else if (hasCurrentTopic(Thread.currentThread())) { - assertNotRecursingTopic(topicObject); - } - - // if other topic then add to pending topics and then wait - else { - pendingTopic = (Topic) this.topicsMap.get(topicObject); - if (pendingTopic == null) { - pendingTopic = new Topic(topicObject); - this.topicsMap.put(topicObject, pendingTopic); - this.topicsQueue.add(pendingTopic); - } - pendingTopic.addThread(Thread.currentThread()); - if (logger.isDebugEnabled()) { - logger.debug( - "Collaboration.acquireUninterruptibly: {}: adding pendingTopic {}; current topic is {}", - this.toString(), pendingTopic, this.currentTopic); - } - } - } // synchronized - // now await the topic change uninterruptibly... - boolean interrupted = Thread.interrupted(); - try { - awaitTopic(pendingTopic, false); - } catch (InterruptedException e) { // LOST INTERRUPT - interrupted = true; - this.stopper.checkCancelInProgress(e); - } finally { - if (interrupted) - Thread.currentThread().interrupt(); - } - } - - private void setCurrentTopic(Topic topic) { - synchronized (this.topicsQueue) { - if (this.currentTopic != null) { - synchronized (this.currentTopic) { - this.currentTopic.setCurrentTopic(false); - this.currentTopic.setOldTopic(true); - } - } - if (topic != null) { - synchronized (topic) { - topic.setCurrentTopic(true); - this.currentTopic = topic; - if (logger.isDebugEnabled()) { - logger.debug("Collaboration.setCurrentTopic: {}: new topic is {}", this.getIdentity(), - topic); - } - this.currentTopic.notifyAll(); - } - } else { - if (logger.isDebugEnabled()) { - logger.debug("Collaboration.setCurrentTopic: {} setting current topic to null", - this.toString()); - } - this.currentTopic = null; - } - } // synchronized - } - - private void awaitTopic(Topic topic, boolean interruptible) throws InterruptedException { - // wait while currentTopic exists and doesn't match my topic - boolean isDebugEnabled = logger.isDebugEnabled(); - synchronized (topic) { - while (!topic.isCurrentTopic()) { - if (topic.isOldTopic()) { - // warning: cannot call toString while under sync(topic) - Assert.assertTrue(false, - "[" + getIdentity() + ".awaitTopic] attempting to wait on old topic"); - } - boolean interrupted = Thread.interrupted(); - try { - // In order to examine the current topic, we would need to - // lock the topicsQueue and then the topic, in that order. - // No can do in this instance (wrong lock ordering) but we still want - // a sense of why we did the wait. - Topic sniff = this.currentTopic; - if (isDebugEnabled) { - logger.debug( - "Collaboration.awaitTopic: {} waiting for topic {}; current topic probably {}, which may have a thread count of {}", - getIdentity(), topic, sniff.toString(), sniff.threadCount()); - } - topic.wait(); - } catch (InterruptedException e) { - if (interruptible) - throw e; - interrupted = true; - this.stopper.checkCancelInProgress(e); - } finally { - if (interrupted) - Thread.currentThread().interrupt(); - } - } - } - - // remove this assertion after we're sure this class is working... - /* - * Assert.assertTrue(isCurrentTopic(topic.getTopicObject()), "Failed to make " + topic + - * " the topic for " + this); - */ - } - - /** - * Acquire permission to participate in the collaboration without waiting. - * - * @param topicObject Object to collaborate on - * @return true if participation in the collaboration was acquired - */ - public boolean tryAcquire(Object topicObject) { - throw new UnsupportedOperationException( - LocalizedStrings.Collaboration_NOT_IMPLEMENTED.toLocalizedString()); - } - - /** - * Acquire permission to participate in the collaboration; waits the specified timeout. - * - * @param topicObject Object to collaborate on - * @param timeout the maximum time to wait for a permit - * @param unit the time unit of the <tt>timeout</tt> argument. - * @return true if participation in the collaboration was acquired - * - * @throws InterruptedException if thread is interrupted - */ - public boolean tryAcquire(Object topicObject, long timeout, TimeUnit unit) - throws InterruptedException { - throw new UnsupportedOperationException( - LocalizedStrings.Collaboration_NOT_IMPLEMENTED.toLocalizedString()); - } - - /** - * Releases the current thread's participation in the collaboration. When the last thread involved - * in the current topic has released, a new topic can be started by any waiting threads. - * <p> - * Nothing happens if the calling thread is not participating in the current topic. - */ - public void release() { - final boolean isDebugEnabled = logger.isDebugEnabled(); - synchronized (this.topicsQueue) { - Topic topic = this.currentTopic; - if (topic == null) { - throw new IllegalStateException( - LocalizedStrings.Collaboration_COLLABORATION_HAS_NO_CURRENT_TOPIC.toLocalizedString()); - } - if (isDebugEnabled) { - logger.debug("Collaboration.release: {} releasing topic", this.toString()); - } - if (topic.isEmptyAfterRemovingThread(Thread.currentThread())) { - if (isDebugEnabled) { - logger.debug("Collaboration.release: {} released old topic {}", this.toString(), topic); - } - // current topic is done... release it - this.topicsMap.remove(topic.getTopicObject()); - if (!this.topicsQueue.isEmpty()) { - // next topic becomes the current topic - Topic nextTopic = (Topic) this.topicsQueue.remove(0); - setCurrentTopic(nextTopic); - } else { - setCurrentTopic(null); - } - } else { - if (isDebugEnabled) { - logger.debug("Collaboration.release: {} released current topic ", this.toString()); - } - } - } // synchronized - } - - /** Returns true if a collaboration topic currently exists. */ - public boolean hasCurrentTopic(Thread thread) { - synchronized (this.topicsQueue) { - if (this.currentTopic == null) - return false; - return this.currentTopic.hasThread(thread); - } - } - - /** Returns true if a collaboration topic currently exists. */ - public boolean hasCurrentTopic() { - synchronized (this.topicsQueue) { - return (this.currentTopic != null); - } - } - - /** Returns true if topic matches the current collaboration topic. */ - public boolean isCurrentTopic(Object topicObject) { - if (topicObject == null) { - throw new IllegalArgumentException( - LocalizedStrings.Collaboration_TOPIC_MUST_BE_SPECIFIED.toLocalizedString()); - } - synchronized (this.topicsQueue) { - if (this.currentTopic == null) { - return false; - } - return this.currentTopic.getTopicObject().equals(topicObject); - } - } - - @Override - public String toString() { - synchronized (this.topicsQueue) { - Topic topic = this.currentTopic; - int threadCount = 0; - if (topic != null) { - threadCount = topic.threadCount(); - } - return getIdentity() + ": topic=" + topic + " threadCount=" + threadCount; - } - } - - protected String getIdentity() { - String me = super.toString(); - return me.substring(me.lastIndexOf(".") + 1); - } - - /** - * Blocking threads will wait on this wrapper object. As threads release, they will be removed - * from the Topic. The last one removed will notifyAll on the next Topic in topicsQueue. - */ - static public class Topic { - - private boolean isCurrentTopic = false; - - private boolean isOldTopic = false; - - private final Object topicObject; - - /** - * guarded.By {@link Collaboration#topicsQueue} guarded.By this instance, <em>after</em> - * acquiring the topicsQueue - */ - private final List participatingThreads = new ArrayList(); - - /** Constructs new Topic to wrap the internal topicObject. */ - public Topic(Object topicObject) { - this.topicObject = topicObject; - } - - public boolean isCurrentTopic() { - synchronized (this) { - return this.isCurrentTopic; - } - } - - public boolean isOldTopic() { - synchronized (this) { - return this.isOldTopic; - } - } - - public Object getTopicObject() { - synchronized (this) { - return this.topicObject; - } - } - - public void setOldTopic(boolean v) { - synchronized (this) { - this.isOldTopic = v; - } - } - - public void setCurrentTopic(boolean v) { - synchronized (this) { - this.isOldTopic = v; - } - } - - /** - * Atomically removes thread and returns true if there are no more participating threads. - */ - public boolean isEmptyAfterRemovingThread(Thread thread) { - synchronized (this) { - boolean removed = this.participatingThreads.remove(thread); - if (!removed) { - Assert.assertTrue(false, "thread " + thread + " was not participating in " + this); - } - /* - * if (Collaboration.this.debugEnabled()) { Collaboration.this.log.fine("[" + - * Collaboration.this.getIdentity() + ".Topic] removed " + thread + " from " + this + - * "; remaining threads: " + this.participatingThreads); } - */ - return this.participatingThreads.isEmpty(); - } - } - - /** Adds thread to list of threads participating in this topic. */ - public void addThread(Thread thread) { - synchronized (this) { - this.participatingThreads.add(thread); - } - } - - /** Returns true if the thread was removed from participating threads. */ - public boolean removeThread(Thread thread) { - synchronized (this) { - return this.participatingThreads.remove(thread); - } - } - - /** Returns count of threads participating in this topic. */ - public int threadCount() { - synchronized (this) { - return this.participatingThreads.size(); - } - } - - /** Returns true if the thread is one of the participating threads. */ - public boolean hasThread(Thread thread) { - synchronized (this) { - return this.participatingThreads.contains(thread); - } - } - - @Override - public String toString() { - String nick = super.toString(); - nick = nick.substring(nick.lastIndexOf(".") + 1); - return nick + ": " + topicObject; - } - - } - -} - http://git-wip-us.apache.org/repos/asf/geode/blob/a76aaf0c/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/CollaborationJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/CollaborationJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/CollaborationJUnitTest.java deleted file mode 100755 index f658dec..0000000 --- a/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/CollaborationJUnitTest.java +++ /dev/null @@ -1,615 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.distributed.internal.locks; - -import static org.junit.Assert.*; - -import java.util.*; - -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.geode.CancelCriterion; -import org.apache.geode.LogWriter; -import org.apache.geode.SystemFailure; -import org.apache.geode.internal.logging.LocalLogWriter; -import org.apache.geode.test.dunit.ThreadUtils; -import org.apache.geode.test.dunit.Wait; -import org.apache.geode.test.dunit.WaitCriterion; -import org.apache.geode.test.junit.categories.IntegrationTest; -import org.apache.geode.internal.logging.InternalLogWriter; - -/** - * Tests the Collaboration Lock used internally by dlock service. - * - * @since GemFire 4.1.1 - */ -@Category(IntegrationTest.class) -@Ignore("Test is broken and was named CollaborationJUnitDisabledTest") -public class CollaborationJUnitTest { - - protected LogWriter log = new LocalLogWriter(InternalLogWriter.INFO_LEVEL); - protected Collaboration collaboration; - - @Before - public void setUp() throws Exception { - this.collaboration = new Collaboration(new CancelCriterion() { - @Override - public String cancelInProgress() { - return null; - } - - @Override - public RuntimeException generateCancelledException(Throwable e) { - return null; - } - }); - } - - @After - public void tearDown() throws Exception { - this.collaboration = null; - } - - protected volatile boolean flagTestBlocksUntilRelease = false; - protected volatile boolean threadBStartedTestBlocksUntilRelease = false; - - @Test - public void testBlocksUntilRelease() throws Exception { - this.log.info("[testBlocksUntilRelease]"); - Thread threadA = new Thread(group, new Runnable() { - @Override - public void run() { - collaboration.acquireUninterruptibly("topicA"); - try { - flagTestBlocksUntilRelease = true; - while (flagTestBlocksUntilRelease) { - try { - Thread.sleep(10); - } catch (InterruptedException ignore) { - fail("interrupted"); - } - } - } finally { - collaboration.release(); - } - } - }); - - // thread one acquires - threadA.start(); - WaitCriterion ev = new WaitCriterion() { - @Override - public boolean done() { - return CollaborationJUnitTest.this.flagTestBlocksUntilRelease; - } - - @Override - public String description() { - return "waiting for thread"; - } - }; - Wait.waitForCriterion(ev, 5 * 1000, 200, true); - assertTrue(this.collaboration.hasCurrentTopic(threadA)); - - // thread two blocks until one releeases - Thread threadB = new Thread(group, new Runnable() { - @Override - public void run() { - threadBStartedTestBlocksUntilRelease = true; - collaboration.acquireUninterruptibly("topicB"); - try { - flagTestBlocksUntilRelease = true; - WaitCriterion ev2 = new WaitCriterion() { - @Override - public boolean done() { - return !flagTestBlocksUntilRelease; - } - - @Override - public String description() { - return "waiting for release"; - } - }; - Wait.waitForCriterion(ev2, 20 * 1000, 200, true); - } finally { - collaboration.release(); - } - } - }); - - // start up threadB - threadB.start(); - ev = new WaitCriterion() { - @Override - public boolean done() { - return threadBStartedTestBlocksUntilRelease; - } - - @Override - public String description() { - return "waiting for thread b"; - } - }; - Wait.waitForCriterion(ev, 5 * 1000, 200, true); - - // threadA holds topic and threadB is waiting... - assertTrue(this.collaboration.hasCurrentTopic(threadA)); - assertFalse(this.collaboration.hasCurrentTopic(threadB)); - - // let threadA release so that threadB gets lock - this.flagTestBlocksUntilRelease = false; - ThreadUtils.join(threadA, 30 * 1000); - - // make sure threadB is doing what it's supposed to do... - ev = new WaitCriterion() { - @Override - public boolean done() { - return flagTestBlocksUntilRelease; - } - - @Override - public String description() { - return "threadB"; - } - }; - Wait.waitForCriterion(ev, 5 * 1000, 200, true); - // threadB must have lock now... let threadB release - assertTrue(this.collaboration.hasCurrentTopic(threadB)); - this.flagTestBlocksUntilRelease = false; - ThreadUtils.join(threadB, 30 * 1000); - - // collaboration should be free now - assertFalse(this.collaboration.hasCurrentTopic(threadA)); - assertFalse(this.collaboration.hasCurrentTopic(threadB)); - assertFalse(this.collaboration.hasCurrentTopic()); - } - - protected volatile boolean threadAFlag_TestLateComerJoinsIn = false; - protected volatile boolean threadBFlag_TestLateComerJoinsIn = false; - protected volatile boolean threadCFlag_TestLateComerJoinsIn = true; - protected volatile boolean threadDFlag_TestLateComerJoinsIn = false; - - @Test - public void testLateComerJoinsIn() throws Exception { - this.log.info("[testLateComerJoinsIn]"); - - final Object topicA = "topicA"; - final Object topicB = "topicB"; - - // threads one and two acquire - Thread threadA = new Thread(group, new Runnable() { - @Override - public void run() { - collaboration.acquireUninterruptibly(topicA); - try { - threadAFlag_TestLateComerJoinsIn = true; - WaitCriterion ev = new WaitCriterion() { - @Override - public boolean done() { - return !threadAFlag_TestLateComerJoinsIn; - } - - @Override - public String description() { - return null; - } - }; - Wait.waitForCriterion(ev, 60 * 1000, 200, true); - } finally { - collaboration.release(); - } - } - }); - threadA.start(); - WaitCriterion ev = new WaitCriterion() { - @Override - public boolean done() { - return threadAFlag_TestLateComerJoinsIn; - } - - @Override - public String description() { - return "wait for ThreadA"; - } - }; - Wait.waitForCriterion(ev, 30 * 1000, 200, true); - assertTrue(this.collaboration.hasCurrentTopic(threadA)); - assertTrue(this.collaboration.isCurrentTopic(topicA)); - - Thread threadB = new Thread(group, new Runnable() { - @Override - public void run() { - collaboration.acquireUninterruptibly(topicA); - try { - threadBFlag_TestLateComerJoinsIn = true; - WaitCriterion ev2 = new WaitCriterion() { - @Override - public boolean done() { - return !threadBFlag_TestLateComerJoinsIn; - } - - @Override - public String description() { - return null; - } - }; - Wait.waitForCriterion(ev2, 60 * 1000, 200, true); - } finally { - collaboration.release(); - } - } - }); - threadB.start(); - ev = new WaitCriterion() { - @Override - public boolean done() { - return threadBFlag_TestLateComerJoinsIn; - } - - @Override - public String description() { - return ""; - } - }; - Wait.waitForCriterion(ev, 60 * 1000, 200, true); - assertTrue(this.collaboration.hasCurrentTopic(threadB)); - - // thread three blocks for new topic - Thread threadC = new Thread(group, new Runnable() { - @Override - public void run() { - threadCFlag_TestLateComerJoinsIn = false; - collaboration.acquireUninterruptibly(topicB); - try { - threadCFlag_TestLateComerJoinsIn = true; - WaitCriterion ev2 = new WaitCriterion() { - @Override - public boolean done() { - return !threadCFlag_TestLateComerJoinsIn; - } - - @Override - public String description() { - return null; - } - }; - Wait.waitForCriterion(ev2, 60 * 1000, 200, true); - } finally { - collaboration.release(); - } - } - }); - threadC.start(); - ev = new WaitCriterion() { - @Override - public boolean done() { - return threadCFlag_TestLateComerJoinsIn; - } - - @Override - public String description() { - return null; - } - }; - Wait.waitForCriterion(ev, 60 * 1000, 200, true); - assertFalse(this.collaboration.hasCurrentTopic(threadC)); - assertFalse(this.collaboration.isCurrentTopic(topicB)); - - // thread four (lateComer) acquires current topic immediately - Thread threadD = new Thread(group, new Runnable() { - @Override - public void run() { - collaboration.acquireUninterruptibly(topicA); - try { - threadDFlag_TestLateComerJoinsIn = true; - while (threadDFlag_TestLateComerJoinsIn) { - try { - Thread.sleep(10); - } catch (InterruptedException ignore) { - fail("interrupted"); - } - } - } finally { - collaboration.release(); - } - } - }); - threadD.start(); - ev = new WaitCriterion() { - @Override - public boolean done() { - return threadDFlag_TestLateComerJoinsIn; - } - - @Override - public String description() { - return null; - } - }; - Wait.waitForCriterion(ev, 60 * 1000, 200, true); - assertTrue(this.collaboration.hasCurrentTopic(threadD)); - - // release threadA - this.threadAFlag_TestLateComerJoinsIn = false; - ThreadUtils.join(threadA, 30 * 1000); - assertFalse(this.collaboration.hasCurrentTopic(threadA)); - assertTrue(this.collaboration.hasCurrentTopic(threadB)); - assertFalse(this.collaboration.hasCurrentTopic(threadC)); - assertTrue(this.collaboration.hasCurrentTopic(threadD)); - assertTrue(this.collaboration.isCurrentTopic(topicA)); - assertFalse(this.collaboration.isCurrentTopic(topicB)); - - // release threadB - this.threadBFlag_TestLateComerJoinsIn = false; - ThreadUtils.join(threadB, 30 * 1000); - assertFalse(this.collaboration.hasCurrentTopic(threadB)); - assertFalse(this.collaboration.hasCurrentTopic(threadC)); - assertTrue(this.collaboration.hasCurrentTopic(threadD)); - assertTrue(this.collaboration.isCurrentTopic(topicA)); - assertFalse(this.collaboration.isCurrentTopic(topicB)); - - // release threadD - this.threadDFlag_TestLateComerJoinsIn = false; - ThreadUtils.join(threadD, 30 * 1000); - ev = new WaitCriterion() { - @Override - public boolean done() { - return threadCFlag_TestLateComerJoinsIn; - } - - @Override - public String description() { - return null; - } - }; - Wait.waitForCriterion(ev, 60 * 1000, 200, true); - assertTrue(this.collaboration.hasCurrentTopic(threadC)); - assertFalse(this.collaboration.hasCurrentTopic(threadD)); - assertFalse(this.collaboration.isCurrentTopic(topicA)); - assertTrue(this.collaboration.isCurrentTopic(topicB)); - - // release threadC - this.threadCFlag_TestLateComerJoinsIn = false; - ThreadUtils.join(threadC, 30 * 1000); - assertFalse(this.collaboration.hasCurrentTopic(threadC)); - assertFalse(this.collaboration.isCurrentTopic(topicA)); - assertFalse(this.collaboration.isCurrentTopic(topicB)); - } - - protected List waitingList = Collections.synchronizedList(new ArrayList()); - protected List fairnessList = Collections.synchronizedList(new ArrayList()); - protected volatile boolean runTestFairnessStressfully = true; - - @Test - public void testFairnessStressfully() throws Exception { - this.log.info("[testFairnessStressfully]"); - final int numThreads = 20; - Thread threads[] = new Thread[numThreads]; - - Runnable run = new Runnable() { - public void run() { - boolean released = false; - try { - String uniqueTopic = Thread.currentThread().getName(); - while (runTestFairnessStressfully) { - waitingList.add(uniqueTopic); - collaboration.acquireUninterruptibly(uniqueTopic); - try { - released = false; - fairnessList.add(uniqueTopic); - waitingList.remove(uniqueTopic); - } finally { - // wait for the other threads to line up... - WaitCriterion ev = new WaitCriterion() { - @Override - public boolean done() { - return !runTestFairnessStressfully || waitingList.size() >= numThreads - 1; - } - - @Override - public String description() { - return "other threads lining up"; - } - }; - Wait.waitForCriterion(ev, 60 * 1000, 200, true); - collaboration.release(); - released = true; - } - } - } finally { - if (!released) { - collaboration.release(); - } - } - } - }; - - try { - // many threads loop: acquire and release with unique topic - for (int t = 0; t < threads.length; t++) { - threads[t] = new Thread(group, run, String.valueOf(t)); - threads[t].start(); - } - - log.info("Started all threads... waiting for test to complete."); - - // wait for numThreads * 10 - WaitCriterion ev = new WaitCriterion() { - @Override - public boolean done() { - return fairnessList.size() >= numThreads * 20; - } - - @Override - public String description() { - return "waiting for numThreads * 10"; - } - }; - Wait.waitForCriterion(ev, 5 * 60 * 1000, 200, true); - } finally { - if (this.runTestFairnessStressfully) { - this.runTestFairnessStressfully = false; - } - } - - for (int t = 0; t < threads.length; t++) { - ThreadUtils.join(threads[t], 30 * 1000); - } - - // assert that all topics are acquired in order - // count number of occurrences of each thread - int count[] = new int[numThreads]; - for (int i = 0; i < count.length; i++) { // shouldn't be necessary - count[i] = 0; - } - synchronized (this.fairnessList) { - for (Iterator iter = this.fairnessList.iterator(); iter.hasNext();) { - int id = Integer.valueOf((String) iter.next()).intValue(); - count[id] = count[id] + 1; - } - } - - int totalLocks = 0; - int minLocks = Integer.MAX_VALUE; - int maxLocks = 0; - for (int i = 0; i < count.length; i++) { - int locks = count[i]; - this.log.fine("testFairnessStressfully thread-" + i + " acquired topic " + locks + " times."); - if (locks < minLocks) - minLocks = locks; - if (locks > maxLocks) - maxLocks = locks; - totalLocks = totalLocks + locks; - } - - this.log.info("[testFairnessStressfully] totalLocks=" + totalLocks + " minLocks=" + minLocks - + " maxLocks=" + maxLocks); - - int expectedLocks = (totalLocks / numThreads) + 1; - - // NOTE: if you turn on fine logs, this deviation may be too small... - // slower machines may also fail depending on thread scheduling - int deviation = (int) (expectedLocks * 0.25); - int lowThreshold = expectedLocks - deviation; - int highThreshold = expectedLocks + deviation; - - this.log.info("[testFairnessStressfully] deviation=" + deviation + " expectedLocks=" - + expectedLocks + " lowThreshold=" + lowThreshold + " highThreshold=" + highThreshold); - - // if these assertions keep failing we'll have to rewrite the test - // to handle scheduling of the threads... - - assertTrue("minLocks is less than lowThreshold", minLocks >= lowThreshold); - assertTrue("maxLocks is greater than highThreshold", maxLocks <= highThreshold); - } - - @Test - public void testHasCurrentTopic() throws Exception { - this.log.info("[testHasCurrentTopic]"); - assertTrue(!this.collaboration.hasCurrentTopic()); - this.collaboration.acquireUninterruptibly("testHasCurrentTopic"); - try { - assertTrue(this.collaboration.hasCurrentTopic()); - } finally { - this.collaboration.release(); - } - assertTrue(!this.collaboration.hasCurrentTopic()); - } - - protected volatile boolean flagTestThreadHasCurrentTopic = false; - - @Test - public void testThreadHasCurrentTopic() throws Exception { - this.log.info("[testThreadHasCurrentTopic]"); - Thread thread = new Thread(group, new Runnable() { - @Override - public void run() { - collaboration.acquireUninterruptibly("testThreadHasCurrentTopic"); - try { - flagTestThreadHasCurrentTopic = true; - WaitCriterion ev = new WaitCriterion() { - @Override - public boolean done() { - return !flagTestThreadHasCurrentTopic; - } - - @Override - public String description() { - return null; - } - }; - Wait.waitForCriterion(ev, 60 * 1000, 200, true); - } finally { - collaboration.release(); - } - } - }); - - // before starting thread, hasCurrentTopic(thread) returns false - assertTrue(!this.collaboration.hasCurrentTopic(thread)); - thread.start(); - WaitCriterion ev = new WaitCriterion() { - @Override - public boolean done() { - return flagTestThreadHasCurrentTopic; - } - - @Override - public String description() { - return null; - } - }; - Wait.waitForCriterion(ev, 60 * 1000, 200, true); - - // after starting thread, hasCurrentTopic(thread) returns true - assertTrue(this.collaboration.hasCurrentTopic(thread)); - this.flagTestThreadHasCurrentTopic = false; - ThreadUtils.join(thread, 30 * 1000); - - // after thread finishes, hasCurrentTopic(thread) returns false - assertTrue(!this.collaboration.hasCurrentTopic(thread)); - } - - @Test - public void testIsCurrentTopic() throws Exception { - this.log.info("[testIsCurrentTopic]"); - Object topic = "testIsCurrentTopic"; - assertTrue(!this.collaboration.isCurrentTopic(topic)); - this.collaboration.acquireUninterruptibly(topic); - try { - assertTrue(this.collaboration.isCurrentTopic(topic)); - } finally { - this.collaboration.release(); - } - assertTrue(!this.collaboration.isCurrentTopic(topic)); - } - - protected final ThreadGroup group = new ThreadGroup("CollaborationJUnitTest Threads") { - @Override - public void uncaughtException(Thread t, Throwable e) { - if (e instanceof VirtualMachineError) { - SystemFailure.setFailure((VirtualMachineError) e); // don't throw - } - String s = "Uncaught exception in thread " + t; - log.error(s, e); - fail(s); - } - }; -} -