http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Channel.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Channel.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Channel.java deleted file mode 100644 index 268a3fe..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Channel.java +++ /dev/null @@ -1,309 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: Channel.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 11Jun1998 dl Create public version - 25aug1998 dl added peek -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - -/** - * Main interface for buffers, queues, pipes, conduits, etc. - * <p> - * A Channel represents anything that you can put items - * into and take them out of. As with the Sync - * interface, both - * blocking (put(x), take), - * and timeouts (offer(x, msecs), poll(msecs)) policies - * are provided. Using a - * zero timeout for offer and poll results in a pure balking policy. - * <p> - * To aid in efforts to use Channels in a more typesafe manner, - * this interface extends Puttable and Takable. You can restrict - * arguments of instance variables to this type as a way of - * guaranteeing that producers never try to take, or consumers put. - * for example: - * <pre> - * class Producer implements Runnable { - * final Puttable chan; - * Producer(Puttable channel) { chan = channel; } - * public void run() { - * try { - * for(;;) { chan.put(produce()); } - * } - * catch (InterruptedException ex) {} - * } - * Object produce() { ... } - * } - * - * - * class Consumer implements Runnable { - * final Takable chan; - * Consumer(Takable channel) { chan = channel; } - * public void run() { - * try { - * for(;;) { consume(chan.take()); } - * } - * catch (InterruptedException ex) {} - * } - * void consume(Object x) { ... } - * } - * - * class Setup { - * void main() { - * Channel chan = new SomeChannelImplementation(); - * Producer p = new Producer(chan); - * Consumer c = new Consumer(chan); - * new Thread(p).start(); - * new Thread(c).start(); - * } - * } - * </pre> - * <p> - * A given channel implementation might or might not have bounded - * capacity or other insertion constraints, so in general, you cannot tell if - * a given put will block. However, - * Channels that are designed to - * have an element capacity (and so always block when full) - * should implement the - * BoundedChannel - * subinterface. - * <p> - * Channels may hold any kind of item. However, - * insertion of null is not in general supported. Implementations - * may (all currently do) throw IllegalArgumentExceptions upon attempts to - * insert null. - * <p> - * By design, the Channel interface does not support any methods to determine - * the current number of elements being held in the channel. - * This decision reflects the fact that in - * concurrent programming, such methods are so rarely useful - * that including them invites misuse; at best they could - * provide a snapshot of current - * state, that could change immediately after being reported. - * It is better practice to instead use poll and offer to try - * to take and put elements without blocking. For example, - * to empty out the current contents of a channel, you could write: - * <pre> - * try { - * for (;;) { - * Object item = channel.poll(0); - * if (item != null) - * process(item); - * else - * break; - * } - * } - * catch(InterruptedException ex) { ... } - * </pre> - * <p> - * However, it is possible to determine whether an item - * exists in a Channel via <code>peek</code>, which returns - * but does NOT remove the next item that can be taken (or null - * if there is no such item). The peek operation has a limited - * range of applicability, and must be used with care. Unless it - * is known that a given thread is the only possible consumer - * of a channel, and that no time-out-based <code>offer</code> operations - * are ever invoked, there is no guarantee that the item returned - * by peek will be available for a subsequent take. - * <p> - * When appropriate, you can define an isEmpty method to - * return whether <code>peek</code> returns null. - * <p> - * Also, as a compromise, even though it does not appear in interface, - * implementation classes that can readily compute the number - * of elements support a <code>size()</code> method. This allows careful - * use, for example in queue length monitors, appropriate to the - * particular implementation constraints and properties. - * <p> - * All channels allow multiple producers and/or consumers. - * They do not support any kind of <em>close</em> method - * to shut down operation or indicate completion of particular - * producer or consumer threads. - * If you need to signal completion, one way to do it is to - * create a class such as - * <pre> - * class EndOfStream { - * // Application-dependent field/methods - * } - * </pre> - * And to have producers put an instance of this class into - * the channel when they are done. The consumer side can then - * check this via - * <pre> - * Object x = aChannel.take(); - * if (x instanceof EndOfStream) - * // special actions; perhaps terminate - * else - * // process normally - * </pre> - * <p> - * In time-out based methods (poll(msecs) and offer(x, msecs), - * time bounds are interpreted in - * a coarse-grained, best-effort fashion. Since there is no - * way in Java to escape out of a wait for a synchronized - * method/block, time bounds can sometimes be exceeded when - * there is a lot contention for the channel. Additionally, - * some Channel semantics entail a ``point of - * no return'' where, once some parts of the operation have completed, - * others must follow, regardless of time bound. - * <p> - * Interruptions are in general handled as early as possible - * in all methods. Normally, InterruptionExceptions are thrown - * in put/take and offer(msec)/poll(msec) if interruption - * is detected upon entry to the method, as well as in any - * later context surrounding waits. - * <p> - * If a put returns normally, an offer - * returns true, or a put or poll returns non-null, the operation - * completed successfully. - * In all other cases, the operation fails cleanly -- the - * element is not put or taken. - * <p> - * As with Sync classes, spinloops are not directly supported, - * are not particularly recommended for routine use, but are not hard - * to construct. For example, here is an exponential backoff version: - * <pre> - * Object backOffTake(Channel q) throws InterruptedException { - * long waitTime = 0; - * for (;;) { - * Object x = q.poll(0); - * if (x != null) - * return x; - * else { - * Thread.sleep(waitTime); - * waitTime = 3 * waitTime / 2 + 1; - * } - * } - * </pre> - * <p> - * <b>Sample Usage</b>. Here is a producer/consumer design - * where the channel is used to hold Runnable commands representing - * background tasks. - * <pre> - * class Service { - * private final Channel channel = ... some Channel implementation; - * - * private void backgroundTask(int taskParam) { ... } - * - * public void action(final int arg) { - * Runnable command = - * new Runnable() { - * public void run() { backgroundTask(arg); } - * }; - * try { channel.put(command) } - * catch (InterruptedException ex) { - * Thread.currentThread().interrupt(); // ignore but propagate - * } - * } - * - * public Service() { - * Runnable backgroundLoop = - * new Runnable() { - * public void run() { - * for (;;) { - * try { - * Runnable task = (Runnable)(channel.take()); - * task.run(); - * } - * catch (InterruptedException ex) { return; } - * } - * } - * }; - * new Thread(backgroundLoop).start(); - * } - * } - * - * </pre> - * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] - * @see Sync - * @see BoundedChannel -**/ - -public interface Channel extends Puttable, Takable { - - /** - * Place item in the channel, possibly waiting indefinitely until - * it can be accepted. Channels implementing the BoundedChannel - * subinterface are generally guaranteed to block on puts upon - * reaching capacity, but other implementations may or may not block. - * @param item the element to be inserted. Should be non-null. - * @exception InterruptedException if the current thread has - * been interrupted at a point at which interruption - * is detected, in which case the element is guaranteed not - * to be inserted. Otherwise, on normal return, the element is guaranteed - * to have been inserted. - **/ - public void put(Object item) throws InterruptedException; - - /** - * Place item in channel only if it can be accepted within - * msecs milliseconds. The time bound is interpreted in - * a coarse-grained, best-effort fashion. - * @param item the element to be inserted. Should be non-null. - * @param msecs the number of milliseconds to wait. If less than - * or equal to zero, the method does not perform any timed waits, - * but might still require - * access to a synchronization lock, which can impose unbounded - * delay if there is a lot of contention for the channel. - * @return true if accepted, else false - * @exception InterruptedException if the current thread has - * been interrupted at a point at which interruption - * is detected, in which case the element is guaranteed not - * to be inserted (i.e., is equivalent to a false return). - **/ - public boolean offer(Object item, long msecs) throws InterruptedException; - - /** - * Return and remove an item from channel, - * possibly waiting indefinitely until - * such an item exists. - * @return some item from the channel. Different implementations - * may guarantee various properties (such as FIFO) about that item - * @exception InterruptedException if the current thread has - * been interrupted at a point at which interruption - * is detected, in which case state of the channel is unchanged. - * - **/ - public Object take() throws InterruptedException; - - - /** - * Return and remove an item from channel only if one is available within - * msecs milliseconds. The time bound is interpreted in a coarse - * grained, best-effort fashion. - * @param msecs the number of milliseconds to wait. If less than - * or equal to zero, the operation does not perform any timed waits, - * but might still require - * access to a synchronization lock, which can impose unbounded - * delay if there is a lot of contention for the channel. - * @return some item, or null if the channel is empty. - * @exception InterruptedException if the current thread has - * been interrupted at a point at which interruption - * is detected, in which case state of the channel is unchanged - * (i.e., equivalent to a null return). - **/ - - public Object poll(long msecs) throws InterruptedException; - - /** - * Return, but do not remove object at head of Channel, - * or null if it is empty. - **/ - - public Object peek(); - - -} -
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/ClockDaemon.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/ClockDaemon.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/ClockDaemon.java deleted file mode 100644 index d21e929..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/ClockDaemon.java +++ /dev/null @@ -1,403 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: ClockDaemon.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 29Aug1998 dl created initial public version - 17dec1998 dl null out thread after shutdown -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - -import java.util.Date; - -/** - * A general-purpose time-based daemon, vaguely similar in functionality - * to common system-level utilities such as <code>at</code> - * (and the associated crond) in Unix. - * Objects of this class maintain a single thread and a task queue - * that may be used to execute Runnable commands in any of three modes -- - * absolute (run at a given time), relative (run after a given delay), - * and periodic (cyclically run with a given delay). - * <p> - * All commands are executed by the single background thread. - * The thread is not actually started until the first - * request is encountered. Also, if the - * thread is stopped for any reason, one is started upon encountering - * the next request, or <code>restart()</code> is invoked. - * <p> - * If you would instead like commands run in their own threads, you can - * use as arguments Runnable commands that start their own threads - * (or perhaps wrap within ThreadedExecutors). - * <p> - * You can also use multiple - * daemon objects, each using a different background thread. However, - * one of the reasons for using a time daemon is to pool together - * processing of infrequent tasks using a single background thread. - * <p> - * Background threads are created using a ThreadFactory. The - * default factory does <em>not</em> - * automatically <code>setDaemon</code> status. - * <p> - * The class uses Java timed waits for scheduling. These can vary - * in precision across platforms, and provide no real-time guarantees - * about meeting deadlines. - * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] - **/ - -public class ClockDaemon extends ThreadFactoryUser { - - - /** tasks are maintained in a standard priority queue **/ - protected final Heap heap_ = new Heap(DefaultChannelCapacity.get()); - - - protected static class TaskNode implements Comparable { - final Runnable command; // The command to run - final long period; // The cycle period, or -1 if not periodic - private long timeToRun_; // The time to run command - - // Cancellation does not immediately remove node, it just - // sets up lazy deletion bit, so is thrown away when next - // encountered in run loop - - private boolean cancelled_ = false; - - // Access to cancellation status and and run time needs sync - // since they can be written and read in different threads - - synchronized void setCancelled() { cancelled_ = true; } - synchronized boolean getCancelled() { return cancelled_; } - - synchronized void setTimeToRun(long w) { timeToRun_ = w; } - synchronized long getTimeToRun() { return timeToRun_; } - - - public int compareTo(Object other) { - long a = getTimeToRun(); - long b = ((TaskNode)(other)).getTimeToRun(); - return (a < b)? -1 : ((a == b)? 0 : 1); - } - - @Override - public boolean equals(Object o) { // GemStoneAddition - if (o == null || !(o instanceof TaskNode)) return false; - return this.compareTo(o) == 0; - } - - @Override - public int hashCode() { // GemStoneAddition - return (int)getTimeToRun(); - } - - TaskNode(long w, Runnable c, long p) { - timeToRun_ = w; command = c; period = p; - } - - TaskNode(long w, Runnable c) { this(w, c, -1); } - } - - - /** - * Execute the given command at the given time. - * @param date -- the absolute time to run the command, expressed - * as a java.util.Date. - * @param command -- the command to run at the given time. - * @return taskID -- an opaque reference that can be used to cancel execution request - **/ - public Object executeAt(Date date, Runnable command) { - TaskNode task = new TaskNode(date.getTime(), command); - heap_.insert(task); - restart(); - return task; - } - - /** - * Excecute the given command after waiting for the given delay. - * <p> - * <b>Sample Usage.</b> - * You can use a ClockDaemon to arrange timeout callbacks to break out - * of stuck IO. For example (code sketch): - * <pre> - * class X { ... - * - * ClockDaemon timer = ... - * Thread readerThread; - * FileInputStream datafile; - * - * void startReadThread() { - * datafile = new FileInputStream("data", ...); - * - * readerThread = new Thread(new Runnable() { - * public void run() { - * for(;;) { - * // try to gracefully exit before blocking - * if (Thread.currentThread().isInterrupted()) { - * quietlyWrapUpAndReturn(); - * } - * else { - * try { - * int c = datafile.read(); - * if (c == -1) break; - * else process(c); - * } - * catch (IOException ex) { - * cleanup(); - * return; - * } - * } - * } }; - * - * readerThread.start(); - * - * // establish callback to cancel after 60 seconds - * timer.executeAfterDelay(60000, new Runnable() { - * readerThread.interrupt(); // try to interrupt thread - * datafile.close(); // force thread to lose its input file - * }); - * } - * } - * </pre> - * @param millisecondsToDelay -- the number of milliseconds - * from now to run the command. - * @param command -- the command to run after the delay. - * @return taskID -- an opaque reference that can be used to cancel execution request - **/ - public Object executeAfterDelay(long millisecondsToDelay, Runnable command) { - long runtime = System.currentTimeMillis() + millisecondsToDelay; - TaskNode task = new TaskNode(runtime, command); - heap_.insert(task); - restart(); - return task; - } - - /** - * Execute the given command every <code>period</code> milliseconds. - * If <code>startNow</code> is true, execution begins immediately, - * otherwise, it begins after the first <code>period</code> delay. - * <p> - * <b>Sample Usage</b>. Here is one way - * to update Swing components acting as progress indicators for - * long-running actions. - * <pre> - * class X { - * JLabel statusLabel = ...; - * - * int percentComplete = 0; - * synchronized int getPercentComplete() { return percentComplete; } - * synchronized void setPercentComplete(int p) { percentComplete = p; } - * - * ClockDaemon cd = ...; - * - * void startWorking() { - * Runnable showPct = new Runnable() { - * public void run() { - * SwingUtilities.invokeLater(new Runnable() { - * public void run() { - * statusLabel.setText(getPercentComplete() + "%"); - * } - * } - * } - * }; - * - * final Object updater = cd.executePeriodically(500, showPct, true); - * - * Runnable action = new Runnable() { - * public void run() { - * for (int i = 0; i < 100; ++i) { - * work(); - * setPercentComplete(i); - * } - * cd.cancel(updater); - * } - * }; - * - * new Thread(action).start(); - * } - * } - * </pre> - * @param period -- the period, in milliseconds. Periods are - * measured from start-of-task to the next start-of-task. It is - * generally a bad idea to use a period that is shorter than - * the expected task duration. - * @param command -- the command to run at each cycle - * @param startNow -- true if the cycle should start with execution - * of the task now. Otherwise, the cycle starts with a delay of - * <code>period</code> milliseconds. - * @exception IllegalArgumentException if period less than or equal to zero. - * @return taskID -- an opaque reference that can be used to cancel execution request - **/ - public Object executePeriodically(long period, - Runnable command, - boolean startNow) { - - if (period <= 0) throw new IllegalArgumentException(); - - long firstTime = System.currentTimeMillis(); - if (!startNow) firstTime += period; - - TaskNode task = new TaskNode(firstTime, command, period); - heap_.insert(task); - restart(); - return task; - } - - /** - * Cancel a scheduled task that has not yet been run. - * The task will be cancelled - * upon the <em>next</em> opportunity to run it. This has no effect if - * this is a one-shot task that has already executed. - * Also, if an execution is in progress, it will complete normally. - * (It may however be interrupted via getThread().interrupt()). - * But if it is a periodic task, future iterations are cancelled. - * @param taskID -- a task reference returned by one of - * the execute commands - * @exception ClassCastException if the taskID argument is not - * of the type returned by an execute command. - **/ - public static void cancel(Object taskID) { - ((TaskNode)taskID).setCancelled(); - } - - - /** The thread used to process commands **/ - protected Thread thread_; - - - /** - * Return the thread being used to process commands, or - * null if there is no such thread. You can use this - * to invoke any special methods on the thread, for - * example, to interrupt it. - **/ - public synchronized Thread getThread() { - return thread_; - } - - /** set thread_ to null to indicate termination **/ - protected synchronized void clearThread() { - thread_ = null; - } - - /** - * Start (or restart) a thread to process commands, or wake - * up an existing thread if one is already running. This - * method can be invoked if the background thread crashed - * due to an unrecoverable exception in an executed command. - **/ - - public synchronized void restart() { - if (thread_ == null) { - thread_ = threadFactory_.newThread(runLoop_); - thread_.start(); - } - else - notify(); - } - - - /** - * Cancel all tasks and interrupt the background thread executing - * the current task, if any. - * A new background thread will be started if new execution - * requests are encountered. If the currently executing task - * does not repsond to interrupts, the current thread may persist, even - * if a new thread is started via restart(). - **/ - public synchronized void shutDown() { - heap_.clear(); - if (thread_ != null) - thread_.interrupt(); - thread_ = null; - } - - /** Return the next task to execute, or null if thread is interrupted **/ - protected synchronized TaskNode nextTask() { - - // Note: This code assumes that there is only one run loop thread - - try { - while (!Thread.interrupted()) { - - // Using peek simplifies dealing with spurious wakeups - - TaskNode task = (TaskNode)(heap_.peek()); - - if (task == null) { - wait(); - } - else { - long now = System.currentTimeMillis(); - long when = task.getTimeToRun(); - - if (when > now) { // false alarm wakeup - wait(when - now); - } - else { - task = (TaskNode)(heap_.extract()); - - if (!task.getCancelled()) { // Skip if cancelled by - - if (task.period > 0) { // If periodic, requeue - task.setTimeToRun(now + task.period); - heap_.insert(task); - } - - return task; - } - } - } - } - } - catch (InterruptedException ex) { Thread.currentThread().interrupt(); /* GemStoneAddition */ } // fall through - - return null; // on interrupt - } - - /** - * The runloop is isolated in its own Runnable class - * just so that the main - * class need not implement Runnable, which would - * allow others to directly invoke run, which is not supported. - **/ - - protected class RunLoop implements Runnable { - public void run() { - try { - for (;;) { - if (Thread.interrupted()) break; // GemStoneAddition - TaskNode task = nextTask(); - if (task != null) - task.command.run(); - else - break; - } - } - finally { - clearThread(); - } - } - } - - protected final RunLoop runLoop_; - - /** - * Create a new ClockDaemon - **/ - - public ClockDaemon() { - runLoop_ = new RunLoop(); - } - - - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CondVar.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CondVar.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CondVar.java deleted file mode 100644 index fb4006d..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CondVar.java +++ /dev/null @@ -1,277 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: ConditionVariable.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 11Jun1998 dl Create public version -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - -/** - * This class is designed for fans of POSIX pthreads programming. - * If you restrict yourself to Mutexes and CondVars, you can - * use most of your favorite constructions. Don't randomly mix them - * with synchronized methods or blocks though. - * <p> - * Method names and behavior are as close as is reasonable to - * those in POSIX. - * <p> - * <b>Sample Usage.</b> Here is a full version of a bounded buffer - * that implements the BoundedChannel interface, written in - * a style reminscent of that in POSIX programming books. - * <pre> - * class CVBuffer implements BoundedChannel { - * private final Mutex mutex; - * private final CondVar notFull; - * private final CondVar notEmpty; - * private int count = 0; - * private int takePtr = 0; - * private int putPtr = 0; - * private final Object[] array; - * - * public CVBuffer(int capacity) { - * array = new Object[capacity]; - * mutex = new Mutex(); - * notFull = new CondVar(mutex); - * notEmpty = new CondVar(mutex); - * } - * - * public int capacity() { return array.length; } - * - * public void put(Object x) throws InterruptedException { - * mutex.acquire(); - * try { - * while (count == array.length) { - * notFull.await(); - * } - * array[putPtr] = x; - * putPtr = (putPtr + 1) % array.length; - * ++count; - * notEmpty.signal(); - * } - * finally { - * mutex.release(); - * } - * } - * - * public Object take() throws InterruptedException { - * Object x = null; - * mutex.acquire(); - * try { - * while (count == 0) { - * notEmpty.await(); - * } - * x = array[takePtr]; - * array[takePtr] = null; - * takePtr = (takePtr + 1) % array.length; - * --count; - * notFull.signal(); - * } - * finally { - * mutex.release(); - * } - * return x; - * } - * - * public boolean offer(Object x, long msecs) throws InterruptedException { - * mutex.acquire(); - * try { - * if (count == array.length) { - * notFull.timedwait(msecs); - * if (count == array.length) - * return false; - * } - * array[putPtr] = x; - * putPtr = (putPtr + 1) % array.length; - * ++count; - * notEmpty.signal(); - * return true; - * } - * finally { - * mutex.release(); - * } - * } - * - * public Object poll(long msecs) throws InterruptedException { - * Object x = null; - * mutex.acquire(); - * try { - * if (count == 0) { - * notEmpty.timedwait(msecs); - * if (count == 0) - * return null; - * } - * x = array[takePtr]; - * array[takePtr] = null; - * takePtr = (takePtr + 1) % array.length; - * --count; - * notFull.signal(); - * } - * finally { - * mutex.release(); - * } - * return x; - * } - * } - * - * </pre> - * @see Mutex - * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] - - **/ - -public class CondVar { - - /** The mutex **/ - protected final Sync mutex_; - - /** - * Create a new CondVar that relies on the given mutual - * exclusion lock. - * @param mutex A non-reentrant mutual exclusion lock. - * Standard usage is to supply an instance of <code>Mutex</code>, - * but, for example, a Semaphore initialized to 1 also works. - * On the other hand, many other Sync implementations would not - * work here, so some care is required to supply a sensible - * synchronization object. - * In normal use, the mutex should be one that is used for <em>all</em> - * synchronization of the object using the CondVar. Generally, - * to prevent nested monitor lockouts, this - * object should not use any native Java synchronized blocks. - **/ - - public CondVar(Sync mutex) { - mutex_ = mutex; - } - - /** - * Wait for notification. This operation at least momentarily - * releases the mutex. The mutex is always held upon return, - * even if interrupted. - * @exception InterruptedException if the thread was interrupted - * before or during the wait. However, if the thread is interrupted - * after the wait but during mutex re-acquisition, the interruption - * is ignored, while still ensuring - * that the currentThread's interruption state stays true, so can - * be probed by callers. - **/ - public void await() throws InterruptedException { - if (Thread.interrupted()) throw new InterruptedException(); - try { - synchronized(this) { - mutex_.release(); - try { - wait(); - } - catch (InterruptedException ex) { - notify(); - throw ex; - } - } - } - finally { - // Must ignore interrupt on re-acquire - for (;;) { - boolean interrupted = Thread.interrupted(); // GemStoneAddition - try { - mutex_.acquire(); - break; - } - catch (InterruptedException ex) { - interrupted = true; - } - finally { // GemStoneAddition - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } -// if (interrupted) { -// Thread.currentThread().interrupt(); -// } - } - } - - /** - * Wait for at most msecs for notification. - * This operation at least momentarily - * releases the mutex. The mutex is always held upon return, - * even if interrupted. - * @param msecs The time to wait. A value less than or equal to zero - * causes a momentarily release - * and re-acquire of the mutex, and always returns false. - * @return false if at least msecs have elapsed - * upon resumption; else true. A - * false return does NOT necessarily imply that the thread was - * not notified. For example, it might have been notified - * after the time elapsed but just before resuming. - * @exception InterruptedException if the thread was interrupted - * before or during the wait. - **/ - - public boolean timedwait(long msecs) throws InterruptedException { - if (Thread.interrupted()) throw new InterruptedException(); - boolean success = false; - try { - synchronized(this) { - mutex_.release(); - try { - if (msecs > 0) { - long start = System.currentTimeMillis(); - wait(msecs); - success = System.currentTimeMillis() - start <= msecs; - } - } - catch (InterruptedException ex) { - notify(); - throw ex; - } - } - } - finally { - // Must ignore interrupt on re-acquire -// boolean interrupted = false; GemStoneAddition - for (;;) { - boolean interrupted = Thread.interrupted(); // GemStoneAddition - try { - mutex_.acquire(); - break; - } - catch (InterruptedException ex) { - interrupted = true; - } - finally { // GemStoneAddition - if (interrupted) Thread.currentThread().interrupt(); - } - } -// if (interrupted) { -// Thread.currentThread().interrupt(); -// } - } - return success; - } - - /** - * Notify a waiting thread. - * If one exists, a non-interrupted thread will return - * normally (i.e., not via InterruptedException) from await or timedwait. - **/ - public synchronized void signal() { - notify(); - } - - /** Notify all waiting threads **/ - public synchronized void broadcast() { - notifyAll(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CountDown.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CountDown.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CountDown.java deleted file mode 100644 index d41baa4..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CountDown.java +++ /dev/null @@ -1,126 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: CountDown.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 11Jun1998 dl Create public version -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - -/** - * A CountDown can serve as a simple one-shot barrier. - * A Countdown is initialized - * with a given count value. Each release decrements the count. - * All acquires block until the count reaches zero. Upon reaching - * zero all current acquires are unblocked and all - * subsequent acquires pass without blocking. This is a one-shot - * phenomenon -- the count cannot be reset. - * If you need a version that resets the count, consider - * using a Barrier. - * <p> - * <b>Sample usage.</b> Here are a set of classes in which - * a group of worker threads use a countdown to - * notify a driver when all threads are complete. - * <pre> - * class Worker implements Runnable { - * private final CountDown done; - * Worker(CountDown d) { done = d; } - * public void run() { - * doWork(); - * done.release(); - * } - * } - * - * class Driver { // ... - * void main() { - * CountDown done = new CountDown(N); - * for (int i = 0; i < N; ++i) - * new Thread(new Worker(done)).start(); - * doSomethingElse(); - * done.acquire(); // wait for all to finish - * } - * } - * </pre> - * - * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] - * -**/ - -public class CountDown implements Sync { - protected final int initialCount_; - protected int count_; - - /** Create a new CountDown with given count value **/ - public CountDown(int count) { count_ = initialCount_ = count; } - - - /* - This could use double-check, but doesn't out of concern - for surprising effects on user programs stemming - from lack of memory barriers with lack of synch. - */ - public void acquire() throws InterruptedException { - if (Thread.interrupted()) throw new InterruptedException(); - synchronized(this) { - while (count_ > 0) - wait(); - } - } - - - public boolean attempt(long msecs) throws InterruptedException { - if (Thread.interrupted()) throw new InterruptedException(); - synchronized(this) { - if (count_ <= 0) - return true; - else if (msecs <= 0) - return false; - else { - long waitTime = msecs; - long start = System.currentTimeMillis(); - for (;;) { - wait(waitTime); - if (count_ <= 0) - return true; - else { - waitTime = msecs - (System.currentTimeMillis() - start); - if (waitTime <= 0) - return false; - } - } - } - } - } - - /** - * Decrement the count. - * After the initialCount'th release, all current and future - * acquires will pass - **/ - public synchronized void release() { - if (--count_ == 0) - notifyAll(); - } - - /** Return the initial count value **/ - public int initialCount() { return initialCount_; } - - - /** - * Return the current count value. - * This is just a snapshot value, that may change immediately - * after returning. - **/ - public synchronized int currentCount() { return count_; } -} - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CyclicBarrier.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CyclicBarrier.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CyclicBarrier.java deleted file mode 100644 index 9b55352..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/CyclicBarrier.java +++ /dev/null @@ -1,299 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: CyclicBarrier.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 11Jul1998 dl Create public version - 28Aug1998 dl minor code simplification -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - -/** - * A cyclic barrier is a reasonable choice for a barrier in contexts - * involving a fixed sized group of threads that - * must occasionally wait for each other. - * (A Rendezvous better handles applications in which - * any number of threads meet, n-at-a-time.) - * <p> - * CyclicBarriers use an all-or-none breakage model - * for failed synchronization attempts: If threads - * leave a barrier point prematurely because of timeout - * or interruption, others will also leave abnormally - * (via BrokenBarrierException), until - * the barrier is <code>restart</code>ed. This is usually - * the simplest and best strategy for sharing knowledge - * about failures among cooperating threads in the most - * common usages contexts of Barriers. - * This implementation has the property that interruptions - * among newly arriving threads can cause as-yet-unresumed - * threads from a previous barrier cycle to return out - * as broken. This transmits breakage - * as early as possible, but with the possible byproduct that - * only some threads returning out of a barrier will realize - * that it is newly broken. (Others will not realize this until a - * future cycle.) (The Rendezvous class has a more uniform, but - * sometimes less desirable policy.) - * <p> - * Barriers support an optional Runnable command - * that is run once per barrier point. - * <p> - * <b>Sample usage</b> Here is a code sketch of - * a barrier in a parallel decomposition design. - * <pre> - * class Solver { - * final int N; - * final float[][] data; - * final CyclicBarrier barrier; - * - * class Worker implements Runnable { - * int myRow; - * Worker(int row) { myRow = row; } - * public void run() { - * while (!done()) { - * processRow(myRow); - * - * try { - * barrier.barrier(); - * } - * catch (InterruptedException ex) { return; } - * catch (BrokenBarrierException ex) { return; } - * } - * } - * } - * - * public Solver(float[][] matrix) { - * data = matrix; - * N = matrix.length; - * barrier = new CyclicBarrier(N); - * barrier.setBarrierCommand(new Runnable() { - * public void run() { mergeRows(...); } - * }); - * for (int i = 0; i < N; ++i) { - * new Thread(new Worker(i)).start(); - * waitUntilDone(); - * } - * } - * </pre> - * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] - - **/ -public class CyclicBarrier implements Barrier { - - protected final int parties_; - protected boolean broken_ = false; - protected Runnable barrierCommand_ = null; - protected int count_; // number of parties still waiting - protected int resets_ = 0; // incremented on each release - - /** - * Create a CyclicBarrier for the indicated number of parties, - * and no command to run at each barrier. - * @exception IllegalArgumentException if parties less than or equal to zero. - **/ - - public CyclicBarrier(int parties) { this(parties, null); } - - /** - * Create a CyclicBarrier for the indicated number of parties. - * and the given command to run at each barrier point. - * @exception IllegalArgumentException if parties less than or equal to zero. - **/ - - public CyclicBarrier(int parties, Runnable command) { - if (parties <= 0) throw new IllegalArgumentException(); - parties_ = parties; - count_ = parties; - barrierCommand_ = command; - } - - /** - * Set the command to run at the point at which all threads reach the - * barrier. This command is run exactly once, by the thread - * that trips the barrier. The command is not run if the barrier is - * broken. - * @param command the command to run. If null, no command is run. - * @return the previous command - **/ - - public synchronized Runnable setBarrierCommand(Runnable command) { - Runnable old = barrierCommand_; - barrierCommand_ = command; - return old; - } - - public synchronized boolean broken() { return broken_; } - - /** - * Reset to initial state. Clears both the broken status - * and any record of waiting threads, and releases all - * currently waiting threads with indeterminate return status. - * This method is intended only for use in recovery actions - * in which it is somehow known - * that no thread could possibly be relying on the - * the synchronization properties of this barrier. - **/ - - public synchronized void restart() { - broken_ = false; - ++resets_; - count_ = parties_; - notifyAll(); - } - - - public int parties() { return parties_; } - - /** - * Enter barrier and wait for the other parties()-1 threads. - * @return the arrival index: the number of other parties - * that were still waiting - * upon entry. This is a unique value from zero to parties()-1. - * If it is zero, then the current - * thread was the last party to hit barrier point - * and so was responsible for releasing the others. - * @exception BrokenBarrierException if any other thread - * in any previous or current barrier - * since either creation or the last <code>restart</code> - * operation left the barrier - * prematurely due to interruption or time-out. (If so, - * the <code>broken</code> status is also set.) - * Threads that are notified to have been - * interrupted <em>after</em> being released are not considered - * to have broken the barrier. - * In all cases, the interruption - * status of the current thread is preserved, so can be tested - * by checking <code>Thread.interrupted</code>. - * @exception InterruptedException if this thread was interrupted - * during the barrier, and was the one causing breakage. - * If so, <code>broken</code> status is also set. - **/ - - public int barrier() throws InterruptedException, BrokenBarrierException { - if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition - return doBarrier(false, 0); - } - - /** - * Enter barrier and wait at most msecs for the other parties()-1 threads. - * @return if not timed out, the arrival index: the number of other parties - * that were still waiting - * upon entry. This is a unique value from zero to parties()-1. - * If it is zero, then the current - * thread was the last party to hit barrier point - * and so was responsible for releasing the others. - * @exception BrokenBarrierException - * if any other thread - * in any previous or current barrier - * since either creation or the last <code>restart</code> - * operation left the barrier - * prematurely due to interruption or time-out. (If so, - * the <code>broken</code> status is also set.) - * Threads that are noticed to have been - * interrupted <em>after</em> being released are not considered - * to have broken the barrier. - * In all cases, the interruption - * status of the current thread is preserved, so can be tested - * by checking <code>Thread.interrupted</code>. - * @exception InterruptedException if this thread was interrupted - * during the barrier. If so, <code>broken</code> status is also set. - * @exception TimeoutException if this thread timed out waiting for - * the barrier. If the timeout occured while already in the - * barrier, <code>broken</code> status is also set. - **/ - - public int attemptBarrier(long msecs) - throws InterruptedException, TimeoutException, BrokenBarrierException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in doBarrier - return doBarrier(true, msecs); - } - - protected synchronized int doBarrier(boolean timed, long msecs) - throws InterruptedException, TimeoutException, BrokenBarrierException { - - if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition - - int index = --count_; - - if (broken_) { - throw new BrokenBarrierException(index); - } - else if (Thread.interrupted()) { - broken_ = true; - notifyAll(); - throw new InterruptedException(); - } - else if (index == 0) { // tripped - count_ = parties_; - ++resets_; - notifyAll(); - try { - if (barrierCommand_ != null) - barrierCommand_.run(); - return 0; - } - catch (RuntimeException ex) { - broken_ = true; - return 0; - } - } - else if (timed && msecs <= 0) { - broken_ = true; - notifyAll(); - throw new TimeoutException(msecs); - } - else { // wait until next reset - int r = resets_; - long startTime = (timed)? System.currentTimeMillis() : 0; - long waitTime = msecs; - for (;;) { - boolean interrupted = Thread.interrupted(); // GemStoneAddition - try { - wait(waitTime); - } - catch (InterruptedException ex) { - // Only claim that broken if interrupted before reset - if (resets_ == r) { - broken_ = true; - notifyAll(); - throw ex; - } - else { -// Thread.currentThread().interrupt(); // propagate - interrupted = true; // GemStoneAddition - } - } - finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - - if (broken_) - throw new BrokenBarrierException(index); - - else if (r != resets_) - return index; - - else if (timed) { - waitTime = msecs - (System.currentTimeMillis() - startTime); - if (waitTime <= 0) { - broken_ = true; - notifyAll(); - throw new TimeoutException(msecs); - } - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/DefaultChannelCapacity.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/DefaultChannelCapacity.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/DefaultChannelCapacity.java deleted file mode 100644 index 3231870..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/DefaultChannelCapacity.java +++ /dev/null @@ -1,58 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: DefaultChannelCapacity.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 11Jun1998 dl Create public version -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - -/** - * A utility class to set the default capacity of - * BoundedChannel - * implementations that otherwise require a capacity argument - * @see BoundedChannel - * [<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p> - **/ - -public class DefaultChannelCapacity { - - /** The initial value of the default capacity is 1024 **/ - public static final int INITIAL_DEFAULT_CAPACITY = 1024; - - /** the current default capacity **/ - private static final SynchronizedInt defaultCapacity_ = - new SynchronizedInt(INITIAL_DEFAULT_CAPACITY); - - /** - * Set the default capacity used in - * default (no-argument) constructor for BoundedChannels - * that otherwise require a capacity argument. - * @exception IllegalArgumentException if capacity less or equal to zero - */ - public static void set(int capacity) { - if (capacity <= 0) throw new IllegalArgumentException(); - defaultCapacity_.set(capacity); - } - - /** - * Get the default capacity used in - * default (no-argument) constructor for BoundedChannels - * that otherwise require a capacity argument. - * Initial value is <code>INITIAL_DEFAULT_CAPACITY</code> - * @see #INITIAL_DEFAULT_CAPACITY - */ - public static int get() { - return defaultCapacity_.get(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/DirectExecutor.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/DirectExecutor.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/DirectExecutor.java deleted file mode 100644 index a87b8bd..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/DirectExecutor.java +++ /dev/null @@ -1,36 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: DirectExecutor.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 21Jun1998 dl Create public version -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - -/** - * - * An implementation of Executor that - * invokes the run method of the supplied command and then returns. - * - * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] - **/ -public class DirectExecutor implements Executor { - /** - * Execute the given command directly in the current thread. - **/ - public void execute(Runnable command) throws InterruptedException { - if (Thread.interrupted()) throw new InterruptedException(); - - command.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Executor.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Executor.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Executor.java deleted file mode 100644 index 3625c04..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Executor.java +++ /dev/null @@ -1,70 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: Executor.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 19Jun1998 dl Create public version -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - -/** - * Interface for objects that execute Runnables, - * as well as various objects that can be wrapped - * as Runnables. - * The main reason to use Executor throughout a program or - * subsystem is to provide flexibility: You can easily - * change from using thread-per-task to using pools or - * queuing, without needing to change most of your code that - * generates tasks. - * <p> - * The general intent is that execution be asynchronous, - * or at least independent of the caller. For example, - * one of the simplest implementations of <code>execute</code> - * (as performed in ThreadedExecutor) - * is <code>new Thread(command).start();</code>. - * However, this interface allows implementations that instead - * employ queueing or pooling, or perform additional - * bookkeeping. - * <p> - * - * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] - **/ -public interface Executor { - /** - * Execute the given command. This method is guaranteed - * only to arrange for execution, that may actually - * occur sometime later; for example in a new - * thread. However, in fully generic use, callers - * should be prepared for execution to occur in - * any fashion at all, including immediate direct - * execution. - * <p> - * The method is defined not to throw - * any checked exceptions during execution of the command. Generally, - * any problems encountered will be asynchronous and - * so must be dealt with via callbacks or error handler - * objects. If necessary, any context-dependent - * catastrophic errors encountered during - * actions that arrange for execution could be accompanied - * by throwing context-dependent unchecked exceptions. - * <p> - * However, the method does throw InterruptedException: - * It will fail to arrange for execution - * if the current thread is currently interrupted. - * Further, the general contract of the method is to avoid, - * suppress, or abort execution if interruption is detected - * in any controllable context surrounding execution. - **/ - public void execute(Runnable command) throws InterruptedException; - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FIFOReadWriteLock.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FIFOReadWriteLock.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FIFOReadWriteLock.java deleted file mode 100644 index 32081a9..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FIFOReadWriteLock.java +++ /dev/null @@ -1,198 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: FIFOReadWriteLock.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 11Jun1998 dl Create public version - 23nov2001 dl Replace main algorithm with fairer - version based on one by Alexander Terekhov -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - - -/** - * This class implements a policy for reader/writer locks in which - * threads contend in a First-in/First-out manner for access (modulo - * the limitations of FIFOSemaphore, which is used for queuing). This - * policy does not particularly favor readers or writers. As a - * byproduct of the FIFO policy, the <tt>attempt</tt> methods may - * return <tt>false</tt> even when the lock might logically be - * available, but, due to contention, cannot be accessed within the - * given time bound. <p> - * - * This lock is <em>NOT</em> reentrant. Current readers and - * writers should not try to re-obtain locks while holding them. - * <p> - * - * [<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p> - * - * @see FIFOSemaphore -**/ - -public class FIFOReadWriteLock implements ReadWriteLock { - - /** - * Fair Semaphore serving as a kind of mutual exclusion lock. - * Writers acquire on entry, and hold until rwlock exit. - * Readers acquire and release only during entry (but are - * blocked from doing so if there is an active writer). - **/ - protected final FIFOSemaphore entryLock = new FIFOSemaphore(1); - - /** - * Number of threads that have entered read lock. Note that this is - * never reset to zero. Incremented only during acquisition of read - * lock while the "entryLock" is held, but read elsewhere, so is - * declared volatile. - **/ - protected volatile int readers; - - /** - * Number of threads that have exited read lock. Note that this is - * never reset to zero. Accessed only in code protected by - * synchronized(this). When exreaders != readers, the rwlock is - * being used for reading. Else if the entry lock is held, it is - * being used for writing (or in transition). Else it is free. - * Note: To distinguish these states, we assume that fewer than 2^32 - * reader threads can simultaneously execute. - **/ - protected int exreaders; - - protected void acquireRead() throws InterruptedException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in acquire - entryLock.acquire(); - ++readers; - entryLock.release(); - } - - protected synchronized void releaseRead() { - /* - If this is the last reader, notify a possibly waiting writer. - Because waits occur only when entry lock is held, at most one - writer can be waiting for this notification. Because increments - to "readers" aren't protected by "this" lock, the notification - may be spurious (when an incoming reader in in the process of - updating the field), but at the point tested in acquiring write - lock, both locks will be held, thus avoiding false alarms. And - we will never miss an opportunity to send a notification when it - is actually needed. - */ - - if (++exreaders == readers) - notify(); - } - - protected void acquireWrite() throws InterruptedException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in acquire - // Acquiring entryLock first forces subsequent entering readers - // (as well as writers) to block. - entryLock.acquire(); - - // Only read "readers" once now before loop. We know it won't - // change because we hold the entry lock needed to update it. - int r = readers; - - try { - synchronized(this) { - while (exreaders != r) - wait(); - } - } - catch (InterruptedException ie) { - entryLock.release(); - throw ie; - } - } - - protected void releaseWrite() { - entryLock.release(); - } - - protected boolean attemptRead(long msecs) throws InterruptedException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in attempt - if (!entryLock.attempt(msecs)) - return false; - - ++readers; - entryLock.release(); - return true; - } - - protected boolean attemptWrite(long msecs) throws InterruptedException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in attempt - long startTime = (msecs <= 0)? 0 : System.currentTimeMillis(); - - if (!entryLock.attempt(msecs)) - return false; - - int r = readers; - - try { - synchronized(this) { - while (exreaders != r) { - long timeLeft = (msecs <= 0)? 0: - msecs - (System.currentTimeMillis() - startTime); - - if (timeLeft <= 0) { - entryLock.release(); - return false; - } - - wait(timeLeft); - } - return true; - } - } - catch (InterruptedException ie) { - entryLock.release(); - throw ie; - } - } - - // support for ReadWriteLock interface - - protected class ReaderSync implements Sync { - public void acquire() throws InterruptedException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in acquireRead - acquireRead(); - } - public void release() { - releaseRead(); - } - public boolean attempt(long msecs) throws InterruptedException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in attemptRead - return attemptRead(msecs); - } - } - - protected class WriterSync implements Sync { - public void acquire() throws InterruptedException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in acquireWrite - acquireWrite(); - } - public void release() { - releaseWrite(); - } - public boolean attempt(long msecs) throws InterruptedException { -// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in attemptWrite - return attemptWrite(msecs); - } - } - - protected final Sync readerSync = new ReaderSync(); - protected final Sync writerSync = new WriterSync(); - - public Sync writeLock() { return writerSync; } - public Sync readLock() { return readerSync; } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FIFOSemaphore.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FIFOSemaphore.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FIFOSemaphore.java deleted file mode 100644 index a7f8a0f..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FIFOSemaphore.java +++ /dev/null @@ -1,84 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: FIFOSemaphore.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 11Jun1998 dl Create public version -*/ - - -package com.gemstone.org.jgroups.oswego.concurrent; - -/** - * A First-in/First-out implementation of a Semaphore. - * Waiting requests will be satisified in - * the order that the processing of those requests got to a certain point. - * If this sounds vague it is meant to be. FIFO implies a - * logical timestamping at some point in the processing of the - * request. To simplify things we don't actually timestamp but - * simply store things in a FIFO queue. Thus the order in which - * requests enter the queue will be the order in which they come - * out. This order need not have any relationship to the order in - * which requests were made, nor the order in which requests - * actually return to the caller. These depend on Java thread - * scheduling which is not guaranteed to be predictable (although - * JVMs tend not to go out of their way to be unfair). - * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] -**/ - -public class FIFOSemaphore extends QueuedSemaphore { - - /** - * Create a Semaphore with the given initial number of permits. - * Using a seed of one makes the semaphore act as a mutual exclusion lock. - * Negative seeds are also allowed, in which case no acquires will proceed - * until the number of releases has pushed the number of permits past 0. - **/ - - public FIFOSemaphore(long initialPermits) { - super(new FIFOWaitQueue(), initialPermits); - } - - /** - * Simple linked list queue used in FIFOSemaphore. - * Methods are not synchronized; they depend on synch of callers - **/ - - protected static class FIFOWaitQueue extends WaitQueue { - protected WaitNode head_ = null; - protected WaitNode tail_ = null; - - @Override // GemStoneAddition - protected void insert(WaitNode w) { - if (tail_ == null) - head_ = tail_ = w; - else { - tail_.next = w; - tail_ = w; - } - } - - @Override // GemStoneAddition - protected WaitNode extract() { - if (head_ == null) - return null; - else { - WaitNode w = head_; - head_ = w.next; - if (head_ == null) tail_ = null; - w.next = null; - return w; - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTask.java ---------------------------------------------------------------------- diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTask.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTask.java deleted file mode 100644 index c7e9827..0000000 --- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTask.java +++ /dev/null @@ -1,535 +0,0 @@ -/** Notice of modification as required by the LGPL - * This file was modified by Gemstone Systems Inc. on - * $Date$ - **/ -/* - File: Task.java - - Originally written by Doug Lea and released into the public domain. - This may be used for any purposes whatsoever without acknowledgment. - Thanks for the assistance and support of Sun Microsystems Labs, - and everyone contributing, testing, and using this code. - - History: - Date Who What - 7Jan1999 dl first release - 14jan1999 dl simplify start() semantics; - improve documentation - 18Jan1999 dl Eliminate useless time-based waits. - 7Mar1999 dl Add reset method, - add array-based composite operations - 27Apr1999 dl Rename -*/ - -package com.gemstone.org.jgroups.oswego.concurrent; - - -/** - * Abstract base class for Fork/Join Tasks. - * - * <p> - * FJTasks are lightweight, stripped-down analogs of Threads. - * Many FJTasks share the same pool of Java threads. This is - * supported by the FJTaskRunnerGroup and FJTaskRunner classes, that - * mainly contain - * methods called only internally by FJTasks. - * FJTasks support versions of the most common methods found in class Thread, - * including start(), yield() and join(). However, they - * don't support priorities, ThreadGroups or other bookkeeping - * or control methods of class Thread. - * <p> - * FJTasks should normally be defined by subclassing and adding a run() method. - * Alternatively, static inner class <code>Wrap(Runnable r)</code> - * can be used to - * wrap an existing Runnable object in a FJTask. - * <p> - * <code>FJTaskRunnerGroup.execute(FJTask)</code> can be used to - * initiate a FJTask from a non-FJTask thread. - * And <code>FJTaskRunnerGroup.invoke(FJTask)</code> can be used to initiate - * a FJTask and then wait for it to complete before returning. - * These are the only entry-points from normal threads to FJTasks. - * Most FJTask methods themselves may only be called from within running FJTasks. - * They throw ClassCastExceptions if they are not, - * reflecting the fact that these methods - * can only be executed using FJTaskRunner threads, not generic - * java.lang.Threads. - * <p> - * There are three different ways to run a FJTask, - * with different scheduling semantics: - * <ul> - * <li> FJTask.start() (as well as FJTaskRunnerGroup.execute(FJTask)) - * behaves pretty much like Thread.start(). It enqueues a task to be - * run the next time any FJTaskRunner thread is otherwise idle. - * It maintains standard FIFO ordering with respect to - * the group of worker threads. - * <li> FJTask.fork() (as well as the two-task spawning method, - * coInvoke(task1, task2), and the array version - * coInvoke(FJTask[] tasks)) starts a task - * that will be executed in - * procedure-call-like LIFO order if executed by the - * same worker thread as the one that created it, but is FIFO - * with respect to other tasks if it is run by - * other worker threads. That is, earlier-forked - * tasks are preferred to later-forked tasks by other idle workers. - * Fork() is noticeably faster than start(), but can only be - * used when these scheduling semantics are acceptable. - * <li> FJTask.invoke(FJTask) just executes the run method - * of one task from within another. It is the analog of a - * direct call. - * </ul> - * <p> - * The main economies of FJTasks stem from the fact that - * FJTasks do not support blocking operations of any kind. - * FJTasks should just run to completion without - * issuing waits or performing blocking IO. - * There are several styles for creating the run methods that - * execute as tasks, including - * event-style methods, and pure computational methods. - * Generally, the best kinds of FJTasks are those that in turn - * generate other FJTasks. - * <p> - * There is nothing actually - * preventing you from blocking within a FJTask, and very short waits/blocks are - * completely well behaved. But FJTasks are not designed - * to support arbitrary synchronization - * since there is no way to suspend and resume individual tasks - * once they have begun executing. FJTasks should also be finite - * in duration -- they should not contain infinite loops. - * FJTasks that might need to perform a blocking - * action, or hold locks for extended periods, or - * loop forever can instead create normal - * java Thread objects that will do so. FJTasks are just not - * designed to support these things. - * FJTasks may however yield() control to allow their FJTaskRunner threads - * to run other tasks, - * and may wait for other dependent tasks via join(). These - * are the only coordination mechanisms supported by FJTasks. - * <p> - * FJTasks, and the FJTaskRunners that execute them are not - * intrinsically robust with respect to exceptions. - * A FJTask that aborts via an exception does not automatically - * have its completion flag (isDone) set. - * As with ordinary Threads, an uncaught exception will normally cause - * its FJTaskRunner thread to die, which in turn may sometimes - * cause other computations being performed to hang or abort. - * You can of course - * do better by trapping exceptions inside the run methods of FJTasks. - * <p> - * The overhead differences between FJTasks and Threads are substantial, - * especially when using fork() or coInvoke(). - * FJTasks can be two or three orders of magnitude faster than Threads, - * at least when run on JVMs with high-performance garbage collection - * (every FJTask quickly becomes garbage) and good native thread support. - * <p> - * Given these overhead savings, you might be tempted to use FJTasks for - * everything you would use a normal Thread to do. Don't. Java Threads - * remain better for general purpose thread-based programming. Remember - * that FJTasks cannot be used for designs involving arbitrary blocking - * synchronization or I/O. Extending FJTasks to support such capabilities - * would amount to re-inventing the Thread class, and would make them - * less optimal in the contexts that they were designed for. - * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] - * <p> - * @see FJTaskRunner - * @see FJTaskRunnerGroup - **/ - -public abstract class FJTask implements Runnable { - - /** - * The only status information associated with FJTasks is whether - * the they are considered to have completed. - * It is set true automatically within - * FJTaskRunner methods upon completion - * of the run method, or manually via cancel. - **/ - - private volatile boolean done; // = false; - - /** - * Return the FJTaskRunner thread running the current FJTask. - * Most FJTask methods are just relays to their current - * FJTaskRunners, that perform the indicated actions. - * @exception ClassCastException if caller thread is not a - * running FJTask. - **/ - - public static FJTaskRunner getFJTaskRunner() { - return (FJTaskRunner)(Thread.currentThread()); - } - - /** - * Return the FJTaskRunnerGroup of the thread running the current FJTask. - * @exception ClassCastException if caller thread is not a - * running FJTask. - **/ - public static FJTaskRunnerGroup getFJTaskRunnerGroup() { - return getFJTaskRunner().getGroup(); - } - - - /** - * Return true if current task has terminated or been cancelled. - * The method is a simple analog of the Thread.isAlive() - * method. However, it reports true only when the task has terminated - * or has been cancelled. It does not distinguish these two cases. - * And there is no way to determine whether a FJTask has been started - * or is currently executing. - **/ - - public final boolean isDone() { return done; } - - /** - * Indicate termination. Intended only to be called by FJTaskRunner. - * FJTasks themselves should use (non-final) method - * cancel() to suppress execution. - **/ - - protected final void setDone() { done = true; } - - /** - * Set the termination status of this task. This simple-minded - * analog of Thread.interrupt - * causes the task not to execute if it has not already been started. - * Cancelling a running FJTask - * has no effect unless the run method itself uses isDone() - * to probe cancellation and take appropriate action. - * Individual run() methods may sense status and - * act accordingly, normally by returning early. - **/ - - public void cancel() { setDone(); } - - - /** - * Clear the termination status of this task. - * This method is intended to be used - * only as a means to allow task objects to be recycled. It should - * be called only when you are sure that the previous - * execution of this task has terminated and, if applicable, has - * been joined by all other waiting tasks. Usage in any other - * context is a very bad idea. - **/ - - public void reset() { done = false; } - - - /** - * Execute this task. This method merely places the task in a - * group-wide scheduling queue. - * It will be run - * the next time any TaskRunner thread is otherwise idle. - * This scheduling maintains FIFO ordering of started tasks - * with respect to - * the group of worker threads. - * @exception ClassCastException if caller thread is not - * running in a FJTaskRunner thread. - **/ - - public void start() { getFJTaskRunnerGroup().executeTask(this); } - - - /** - * Arrange for execution of a strictly dependent task. - * The task that will be executed in - * procedure-call-like LIFO order if executed by the - * same worker thread, but is FIFO with respect to other tasks - * forked by this thread when taken by other worker threads. - * That is, earlier-forked - * tasks are preferred to later-forked tasks by other idle workers. - * <p> - * Fork() is noticeably - * faster than start(). However, it may only - * be used for strictly dependent tasks -- generally, those that - * could logically be issued as straight method calls without - * changing the logic of the program. - * The method is optimized for use in parallel fork/join designs - * in which the thread that issues one or more forks - * cannot continue until at least some of the forked - * threads terminate and are joined. - * @exception ClassCastException if caller thread is not - * running in a FJTaskRunner thread. - **/ - - public void fork() { getFJTaskRunner().push(this); } - - /** - * Allow the current underlying FJTaskRunner thread to process other tasks. - * <p> - * Spinloops based on yield() are well behaved so long - * as the event or condition being waited for is produced via another - * FJTask. Additionally, you must never hold a lock - * while performing a yield or join. (This is because - * multiple FJTasks can be run by the same Thread during - * a yield. Since java locks are held per-thread, the lock would not - * maintain the conceptual exclusion you have in mind.) - * <p> - * Otherwise, spinloops using - * yield are the main construction of choice when a task must wait - * for a condition that it is sure will eventually occur because it - * is being produced by some other FJTask. The most common - * such condition is built-in: join() repeatedly yields until a task - * has terminated after producing some needed results. You can also - * use yield to wait for callbacks from other FJTasks, to wait for - * status flags to be set, and so on. However, in all these cases, - * you should be confident that the condition being waited for will - * occur, essentially always because it is produced by - * a FJTask generated by the current task, or one of its subtasks. - * - * @exception ClassCastException if caller thread is not - * running in a FJTaskRunner thread. - **/ - - public static void yield() { getFJTaskRunner().taskYield(); } - - /** - * Yield until this task isDone. - * Equivalent to <code>while(!isDone()) yield(); </code> - * @exception ClassCastException if caller thread is not - * running in a FJTaskRunner thread. - **/ - - public void join() { getFJTaskRunner().taskJoin(this); } - - /** - * Immediately execute task t by calling its run method. Has no - * effect if t has already been run or has been cancelled. - * It is equivalent to calling t.run except that it - * deals with completion status, so should always be used - * instead of directly calling run. - * The method can be useful - * when a computation has been packaged as a FJTask, but you just need to - * directly execute its body from within some other task. - **/ - - public static void invoke(FJTask t) { - if (!t.isDone()) { - t.run(); - t.setDone(); - } - } - - /** - * Fork both tasks and then wait for their completion. It behaves as: - * <pre> - * task1.fork(); task2.fork(); task2.join(); task1.join(); - * </pre> - * As a simple classic example, here is - * a class that computes the Fibonacci function: - * <pre> - * public class Fib extends FJTask { - * - * // Computes fibonacci(n) = fibonacci(n-1) + fibonacci(n-2); for n> 1 - * // fibonacci(0) = 0; - * // fibonacci(1) = 1. - * - * // Value to compute fibonacci function for. - * // It is replaced with the answer when computed. - * private volatile int number; - * - * public Fib(int n) { number = n; } - * - * public int getAnswer() { - * if (!isDone()) throw new Error("Not yet computed"); - * return number; - * } - * - * public void run() { - * int n = number; - * if (n > 1) { - * Fib f1 = new Fib(n - 1); - * Fib f2 = new Fib(n - 2); - * - * coInvoke(f1, f2); // run these in parallel - * - * // we know f1 and f2 are computed, so just directly access numbers - * number = f1.number + f2.number; - * } - * } - * - * public static void main(String[] args) { // sample driver - * try { - * int groupSize = 2; // 2 worker threads - * int num = 35; // compute fib(35) - * FJTaskRunnerGroup group = new FJTaskRunnerGroup(groupSize); - * Fib f = new Fib(num); - * group.invoke(f); - * int result = f.getAnswer(); - * System.out.println(" Answer: " + result); - * } - * catch (InterruptedException ex) { - * System.out.println("Interrupted"); - * } - * } - * } - * </pre> - * - * @exception ClassCastException if caller thread is not - * running in a FJTaskRunner thread. - **/ - - public static void coInvoke(FJTask task1, FJTask task2) { - getFJTaskRunner().coInvoke(task1, task2); - } - - - /** - * Fork all tasks in array, and await their completion. - * Behaviorally equivalent to: - * <pre> - * for (int i = 0; i < tasks.length; ++i) tasks[i].fork(); - * for (int i = 0; i < tasks.length; ++i) tasks[i].join(); - * </pre> - **/ - - public static void coInvoke(FJTask[] tasks) { - getFJTaskRunner().coInvoke(tasks); - } - - /** - * A FJTask that holds a Runnable r, and calls r.run when executed. - * The class is a simple utilty to allow arbitrary Runnables - * to be used as FJTasks. - **/ - - public static class Wrap extends FJTask { - protected final Runnable runnable; - public Wrap(Runnable r) { runnable = r; } - public void run() { runnable.run(); } - } - - - /** - * A <code>new Seq</code>, when executed, - * invokes each task provided in the constructor, in order. - * The class is a simple utility - * that makes it easier to create composite FJTasks. - **/ - public static class Seq extends FJTask { - protected final FJTask[] tasks; - - /** - * Construct a Seq that, when executed, will process each of the - * tasks in the tasks array in order - **/ - public Seq(FJTask[] tasks) { - this.tasks = tasks; - } - - /** - * Two-task constructor, for compatibility with previous release. - **/ - public Seq(FJTask task1, FJTask task2) { - this.tasks = new FJTask[] { task1, task2 }; - } - - public void run() { - for (int i = 0; i < tasks.length; ++i) FJTask.invoke(tasks[i]); - } - } - - /** - * Construct and return a FJTask object that, when executed, will - * invoke the tasks in the tasks array in array order - **/ - - public static FJTask seq(FJTask[] tasks) { - return new Seq(tasks); - } - - /** - * A <code>new Par</code>, when executed, - * runs the tasks provided in the constructor in parallel using - * coInvoke(tasks). - * The class is a simple utility - * that makes it easier to create composite FJTasks. - **/ - public static class Par extends FJTask { - protected final FJTask[] tasks; - - /** - * Construct a Seq that, when executed, will process each of the - * tasks in the tasks array in parallel - **/ - public Par(FJTask[] tasks) { - this.tasks = tasks; - } - - /** - * Two-task constructor, for compatibility with previous release. - **/ - public Par(FJTask task1, FJTask task2) { - this.tasks = new FJTask[] { task1, task2 }; - } - - - public void run() { - FJTask.coInvoke(tasks); - } - } - - - /** - * Construct and return a FJTask object that, when executed, will - * invoke the tasks in the tasks array in parallel using coInvoke - **/ - public static FJTask par(FJTask[] tasks) { - return new Par(tasks); - } - - /** - * A <code>new Seq2(task1, task2)</code>, when executed, - * invokes task1 and then task2, in order. - * The class is a simple utility - * that makes it easier to create composite Tasks. - **/ - public static class Seq2 extends FJTask { - protected final FJTask fst; - protected final FJTask snd; - public Seq2(FJTask task1, FJTask task2) { - fst = task1; - snd = task2; - } - public void run() { - FJTask.invoke(fst); - FJTask.invoke(snd); - } - } - - /** - * Construct and return a FJTask object that, when executed, will - * invoke task1 and task2, in order - **/ - - public static FJTask seq(FJTask task1, FJTask task2) { - return new Seq2(task1, task2); - } - - /** - * A <code>new Par(task1, task2)</code>, when executed, - * runs task1 and task2 in parallel using coInvoke(task1, task2). - * The class is a simple utility - * that makes it easier to create composite Tasks. - **/ - public static class Par2 extends FJTask { - protected final FJTask fst; - protected final FJTask snd; - public Par2(FJTask task1, FJTask task2) { - fst = task1; - snd = task2; - } - public void run() { - FJTask.coInvoke(fst, snd); - } - } - - - /** - * Construct and return a FJTask object that, when executed, will - * invoke task1 and task2, in parallel - **/ - public static FJTask par(FJTask task1, FJTask task2) { - return new Par2(task1, task2); - } - -}