On 12/12/2010 5:48 PM, Peter Firmstone wrote:
Patricia Shanahan wrote:
On 12/3/2010 7:15 AM, Gregg Wonderly wrote:
...
> The important issue in FastList is that it was written with the JDK1.4
> memory model. After moving River to Java 1.5, we'd have the JSR166 work
> and the new, consistent memory model where volatile has a true meaning.
> However, this code in particular is quite complex as you have noted, so
> even adjusting to the new memory model could be problematic.
I've just run a modified, simplified version of my test with java
version "1.4.2_19" and an unmodified copy of FastList, and I still get
the NullPointerException. This changes my thinking a bit. I had been
working from the assumption that the issue was to do with the changes
in memory model between 1.4 and 1.5. I now have to consider the
possibility of a more basic bug that is independent of the memory model.
If there is anyone with a FastList or Java memory model background who
would like to help, please reply. I would welcome another set of eyes
on the code, and a cross check on my conclusions so far about how
FastList is supposed to work. There seems to be a critical invariant
that gets broken, and once that happens we are on track to either a
NullPointerException or dropped items.
I can supply my test as a unit test (JDK 1.6, Junit 4) and as a main
program (JDK 1.4 or later0. In both forms, all it does is fire up a
mixture of threads that repeatedly add items to a FastList and threads
that repeatedly remove the first item they can from the FastList.
Failures seem to require simultaneous adds and removes.
If I don't nail this problem fairly soon, I may abandon the current,
rather complicated, code and switch to writing a concurrent high
performance FastList substitute for 1.5 or later.
Patricia
I'll have a look tonight, no promises though ;)
I'm attaching the simplified test application main program that can run,
and fail, under JRE 1.4, with no need for Junit.
Here's what I think I know. First of all, I have found some dubious
synchronization situations. However, fixing all the things I have so far
found of that type has only reduced the failure rate, not eliminated
failures. That could be caused by changing timings without having any
impact on the root cause.
The key invariant relates to a thread that is doing a scan, starting
with a call to head() and proceeding through a series of calls to next()
to examine nodes. The head() call sets up a guard node for the thread
that was the tail at some point during the head call. The invariant is
that the series of next() calls will reach the guard node before finding
a null next pointer, indicating the actual tail.
The remove call does not really remove anything, it merely marks the
node removed. Removed nodes are unlinked as a side effect of scans,
during head and next calls, but only if they are not guard nodes.
There are additional complications in the restart and reap methods, but
we can ignore them for now - my test does not use them.
Once a guard node is lost, the synchronization breaks down completely,
because e.g. insertion at tail is protected by synchronization on the
FastList instance, but unlinking of a removed node in the middle is
protected, to the extent it is protected at all, by synchronization on
the FastList.Node instance that is being removed.
The commonest failure symptom is a scan reaching the null next pointer
at the end of the FIFO during head(), without first finding the guard
node it just set up. An alternative form of failure is loss of some
entries - they get added, but the remove threads never find them. The
second symptom is predominant in the JavaSpaces stress test that got me
started on this. Messing up a next pointer could cause either.
Incidentally, I'm curious about why it has such a fragile system in
which the state of a scan is partly tracked by thread, when it seems
like an obvious candidate for the Iterator pattern. Callers do need to
be able to find out if a remove call succeeded or not (the node may have
been removed by another thread), but that could be done in an interface
extending Iterator. The WeakHashMap in a node that keeps track of the
threads for which it is a guard would instead track the Iterator. There
would be no need for thread local storage, the same data could be kept
in the Iterator.
Thanks for any time you can spend looking at this.
Patricia
package com.sun.jini.outrigger;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import com.sun.jini.outrigger.FastList.Node;
public class FastListMain {
public static void main(String[] args) throws InterruptedException{
new FastListMain().manyAddRemoveTest();
}
/**
* Main program version of FastListTest
* This test attempts to add many items to a FastList, using several
* threads, with the same number of threads working to remove them.
*/
public void manyAddRemoveTest() throws InterruptedException {
for (int i = 0; i < 100000; i++) {
int threadCount = 100;
int nodesPerThread = 10000;
FastList list = new FastList();
Set addThreads = new HashSet();
Set removeThreads = new HashSet();
Set removers = new HashSet();
SortedSet found = new TreeSet();
System.err.println("Creating threads for pass " + i);
for (int thread = 0; thread < threadCount; thread++) {
Thread t;
t = new Thread(new NodeAdder(list, thread, nodesPerThread));
addThreads.add(t);
//t.setUncaughtExceptionHandler(handler);
t.start();
final NodeRemover nodeRemover = new NodeRemover(list, thread,
nodesPerThread);
t = new Thread(nodeRemover);
removeThreads.add(t);
removers.add(nodeRemover);
//t.setUncaughtExceptionHandler(handler);
t.start();
}
System.err.println("Waiting for add threads to finish");
Iterator threads;
threads = addThreads.iterator();
while(threads.hasNext()) {
((Thread) threads.next()).join();
}
System.err.println("Waiting for remove threads to finish");
threads = removeThreads.iterator();
while(threads.hasNext()) {
((Thread) threads.next()).join();
}
System.err.println("Collecting results");
Iterator rIt = removers.iterator();
while(rIt.hasNext()){
found.addAll(((NodeRemover) rIt.next()).getNodes());
}
if(threadCount * (long) nodesPerThread != found.size()){
System.err.println("Count mismatch");
}
}
}
private static class ListNode extends FastList.Node implements
Comparable {
int threadId;
int nodeId;
ListNode(int threadId, int nodeId) {
this.threadId = threadId;
this.nodeId = nodeId;
}
public int compareTo(ListNode o) {
if (this.threadId > o.threadId) {
return 1;
}
if (this.threadId < o.threadId) {
return -1;
}
if (this.nodeId > o.nodeId) {
return 1;
}
if (this.nodeId < o.nodeId) {
return -1;
}
return 0;
}
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + nodeId;
result = prime * result + threadId;
return result;
}
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ListNode other = (ListNode) obj;
if (nodeId != other.nodeId)
return false;
if (threadId != other.threadId)
return false;
return true;
}
public int compareTo(Object arg0) {
return compareTo((ListNode) arg0);
}
}
private static class NodeAdder implements Runnable {
FastList list;
int threadId;
int nodes;
List myNodes = new ArrayList();
/**
* Create a NodeAdder whose run method adds the specified number of
* nodes to the specified list. Wait at the specified barrier
* immediately before starting to add nodes.
*
* @param list
* @param threadId
* @param nodes
* @param barrier
*/
NodeAdder(FastList list, int threadId, int nodes) {
this.list = list;
this.threadId = threadId;
this.nodes = nodes;
}
public void run() {
for (int i = 0; i < nodes; i++) {
myNodes.add(new ListNode(threadId, i));
}
Iterator nIt = myNodes.iterator();
while(nIt.hasNext()) {
list.add((Node) nIt.next());
try {
Thread.sleep(0, 1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
private static class NodeRemover implements Runnable {
FastList list;
int threadId;
int nodes;
List myNodes = new ArrayList();
/**
* Create a NodeAdder whose run method removes the specified number of
* nodes from the specified list. Wait at the specified barrier
* immediately before starting to remove nodes.
*
* @param list
* @param threadId
* @param nodes
* @param barrier
*/
NodeRemover(FastList list, int threadId, int nodes) {
this.list = list;
this.threadId = threadId;
this.nodes = nodes;
}
public void run() {
long maxTries = 1000 * nodes;
long tries = 0;
while (myNodes.size() < nodes && tries < maxTries) {
// if (threadId % 1 == 0 && tries % 100 == 0) {
// System.err.printf("Thread %d try %d found %d%n", threadId,
// tries, myNodes.size());
// }
tries++;
removeOne();
}
if (myNodes.size() < nodes) {
// System.err.printf("Remover %d found %d nodes, expected %d%n",
// threadId, myNodes.size(), nodes);
try {
Thread.sleep(1*1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
tries = 0;
while (myNodes.size() < nodes && tries < maxTries) {
// if (threadId % 1 == 0 && tries % 100 == 0) {
// System.err.printf("Thread %d try %d found %d%n",
threadId,
// tries, myNodes.size());
// }
tries++;
removeOne();
}
}
}
private void removeOne() {
for (FastList.Node n = list.head(); n != null; n = n.next()) {
if (!n.removed()) {
if (list.remove(n)) {
synchronized (myNodes) {
myNodes.add((ListNode) n);
}
break;
}
}
}
}
/**
* Return a list of all nodes this remover has removed so far.
*
* @return
*/
public List getNodes() {
synchronized (myNodes) {
return new ArrayList(myNodes);
}
}
}
// Thread.UncaughtExceptionHandler handler = new
Thread.UncaughtExceptionHandler() {
//
// public void uncaughtException(Thread thread, Throwable thrown) {
// System.err.println("Throw in thread " + thread);
// System.err.println(thrown);
// }
// };
}