Here you go.
On 6/22/07, Alan McGovern <[EMAIL PROTECTED]> wrote:
Can i see your testcase?
/*
Copyright (C) 2005 P. Oscar Boykin <[EMAIL PROTECTED]>, University of Florida
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
//#define DEBUG
using System.Collections;
using System.Threading;
using System;
namespace Brunet
{
/**
* This class offers a means to pass objects in a queue
* between threads (or in the same thread). The Dequeue
* method will block until there is something in the Queue
*/
public class BlockingQueue {
public class Entry {
public object Value;
public Entry Next;
}
public BlockingQueue() {
_are = new AutoResetEvent(false);
_closed = false;
_sync = new object();
_head = null;
_tail = null;
_count = 0;
_pool = null;
_pool_count = 0;
}
protected AutoResetEvent _are;
protected bool _closed;
protected readonly object _sync;
protected Entry _head;
protected Entry _tail;
protected int _count;
protected Entry _pool;
protected int _pool_count;
protected const int MAX_POOL = 100;
public bool Closed { get { lock ( _sync ) { return _closed; } } }
/**
* When an item is enqueued, this event is fire
*/
public event EventHandler EnqueueEvent;
/* **********************************************
* Here all the methods
*/
public void Clear() {
lock( _sync ) {
_head = null;
_tail = null;
_count = 0;
}
}
/**
* Once this method is called, and the queue is emptied,
* all future Dequeue's will throw exceptions
*/
public void Close() {
lock( _sync ) {
_closed = true;
_are.Set();
}
//Wake up any blocking threads:
#if DEBUG
System.Console.WriteLine("Close set");
#endif
}
/**
* @throw Exception if the queue is closed
*/
public object Dequeue() {
bool timedout = false;
return Dequeue(-1, out timedout);
}
/**
* @param millisec how many milliseconds to wait if the queue is empty
* @param timedout true if we have to wait too long to get an object
* @return the object, if we timeout we return null
* @throw Exception if the BlockingQueue is closed and empty
*/
public object Dequeue(int millisec, out bool timedout)
{
return Dequeue(millisec, out timedout, true);
}
/**
* Handles peeking and dequeueing
*/
protected object Dequeue(int millisec, out bool timedout, bool advance)
{
object val = null;
bool got_val = false;
lock( _sync ) {
if( _head != null ) {
//Easy
val = _head.Value;
got_val = true;
if( advance ) {
Entry tmp_h = _head;
_head = _head.Next;
if( _head == null ) { _tail = null; }
_count = _count - 1;
//Recycle e:
AddToPool(tmp_h);
}
}
else {
//The queue is empty: check to see if it is also closed:
if( _closed ) {
timedout = false;
throw new InvalidOperationException("BlockingQueue is closed");
}
//Make sure we don't have any old signals waiting...
_are.Reset();
}
}
if( !got_val ) {
//We have to wait
bool got_set = _are.WaitOne(millisec, false);
lock( _sync ) {
if( got_set ) {
if( _closed ) {
timedout = false;
throw new InvalidOperationException("BlockingQueue is closed");
}
//Advance the queue
val = _head.Value;
got_val = true;
if( advance ) {
Entry tmp_h = _head;
_head = _head.Next;
if( _head == null ) { _tail = null; }
_count = _count - 1;
//Recycle e:
AddToPool(tmp_h);
}
}
}
}
if( got_val ) {
timedout = false;
return val;
}
else {
//We timed out:
timedout = true;
return null;
}
}
protected void AddToPool(Entry e) {
if( _pool_count < MAX_POOL ) {
e.Next = _pool;
e.Value = null;
_pool = e;
_pool_count++;
}
}
protected Entry GetEntry(object v) {
//Let's see if we can recycle:
Entry e = _pool;
if( _pool != null ) {
_pool = _pool.Next;
_pool_count--;
}
else {
e = new Entry();
}
e.Value = v;
e.Next = null;
return e;
}
/**
* @throw Exception if the queue is closed
*/
public object Peek() {
bool timedout = false;
return Peek(-1, out timedout);
}
/**
* @param millisec how many milliseconds to wait if the queue is empty
* @param timedout true if we have to wait too long to get an object
* @return the object, if we timeout we return null
* @throw Exception if the BlockingQueue is closed and empty
*/
public object Peek(int millisec, out bool timedout)
{
return Dequeue(millisec, out timedout, false);
}
public void Enqueue(object v) {
bool fire = false;
lock( _sync ) {
//Don't do anything if the queue is closed
if( _closed ) { return; }
Entry e = GetEntry(v);
if( _tail != null ) {
_tail.Next = e;
}
_tail = e;
fire = true;
//Now we need to fix up the head:
_count = _count + 1;
if( _head == null ) {
//We just added to an empty queue
_head = _tail;
_are.Set();
}
}
//After we have alerted any blocking threads (Set), fire
//the event:
if( fire && (EnqueueEvent != null) ) {
EnqueueEvent(this, EventArgs.Empty);
}
}
protected const int TEST_RUNS = 500000;
public void TestThread1()
{
//See a random number generator with the number 1.
Random r = new Random(1);
for(int i = 0; i < TEST_RUNS; i++) {
Enqueue( r.Next() );
}
Close();
}
public static void Main()
{
BlockingQueue bq = new BlockingQueue();
Thread t = new Thread(bq.TestThread1);
t.Start();
Random r = new Random(1);
for(int i = 0; i < TEST_RUNS; i++) {
int j = (int)bq.Dequeue();
int ra = r.Next();
if( j != ra ) {
Console.Error.WriteLine("{0}: {1} != {2}", i, j, ra);
Console.ReadLine();
}
}
//The next dequeue should throw an exception
bool got_exception = false;
try {
bq.Dequeue();
}
catch(Exception x) { got_exception = true; }
if( !got_exception ) {
Console.Error.WriteLine("Didn't get exception");
}
}
}
}
/*
Copyright (C) 2005-2007 P. Oscar Boykin <[EMAIL PROTECTED]>, University of Florida
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
//#define DEBUG
using System.Collections;
using System.Threading;
using System;
#if BRUNET_NUNIT
using NUnit.Framework;
#endif
namespace Brunet
{
/**
* This class offers a means to pass objects in a queue
* between threads (or in the same thread). The Dequeue
* method will block until there is something in the Queue
*/
#if BRUNET_NUNIT
[TestFixture]
#endif
public class BlockingQueue {
protected class Entry {
public object Value;
public object Next;
}
public BlockingQueue() {
_re = new AutoResetEvent(false);
_closed = 0;
_head = new Entry();
_tail = _head;
_count = 0;
}
protected readonly AutoResetEvent _re;
protected int _closed;
protected int _count;
protected object _head;
protected object _tail;
//To hopefully save on garbage collection, we keep a spare entry around
protected const int MAX_POOL_SIZE = 100;
protected object _spare_entry;
protected int _pool_size;
public bool Closed { get { return (_closed == 1); } }
public int Count { get { return _count; } }
/**
* When an item is enqueued, this event is fire
*/
public event EventHandler EnqueueEvent;
/* **********************************************
* Here all the methods
*/
/**
* Once this method is called, and the queue is emptied,
* all future Dequeue's will throw exceptions
*/
public void Close() {
int o_c = Interlocked.CompareExchange(ref _closed, 1, 0);
if( o_c != _closed ) {
/*
* We just transitioned to Closed, set the ARE
*/
_re.Set();
}
#if DEBUG
System.Console.WriteLine("Close set");
#endif
}
/**
* @throw Exception if the queue is closed
*/
public object Dequeue() {
bool timedout = false;
return Dequeue(-1, out timedout);
}
/**
* @param millisec how many milliseconds to wait if the queue is empty
* @param timedout true if we have to wait too long to get an object
* @return the object, if we timeout we return null
* @throw Exception if the BlockingQueue is closed and empty
*/
public object Dequeue(int millisec, out bool timedout)
{
bool got_set = _re.WaitOne(millisec, false);
if( !got_set ) {
timedout = true;
return null;
}
else {
timedout = false;
/* Just look at the head */
Entry temp_h = null;
object temp_next = null;
/* Loop until we either hit an exception, or get a successful move of
* head
*/
do {
temp_h = _head as Entry;
temp_next = temp_h.Next;
if( temp_next == null ) {
//Looks like the queue is empty:
if( Closed ) {
//Make sure the next guy can move through:
_re.Set();
}
else { Console.WriteLine("head.Next == null, but not closed"); }
throw new InvalidOperationException("BlockingQueue is empty");
}
} while( Interlocked.CompareExchange(ref _head, temp_next, temp_h) != temp_h );
/* Now we have temp_h which was the old head */
if( Interlocked.Decrement( ref _count ) > 0 || Closed ) {
//There are more to get, let the next guy through:
_re.Set();
}
//Save the old temp_h to possibly reuse
PutToPool(temp_h);
return ((Entry)temp_next).Value;
}
}
/**
* @throw Exception if the queue is closed
*/
public object Peek() {
bool timedout = false;
return Peek(-1, out timedout);
}
/**
* @param millisec how many milliseconds to wait if the queue is empty
* @param timedout true if we have to wait too long to get an object
* @return the object, if we timeout we return null
* @throw Exception if the BlockingQueue is closed and empty
*/
public object Peek(int millisec, out bool timedout)
{
object val = null;
bool got_set = _re.WaitOne(millisec, false);
if( !got_set ) {
timedout = true;
return null;
}
else {
timedout = false;
/* Just look at the head */
Entry temp_h = _head as Entry;
/* we are not pulling anything out, so reset the wait handle */
_re.Set();
if( temp_h.Next != null ) {
return ((Entry)temp_h.Next).Value;
}
else {
throw new InvalidOperationException("BlockingQueue is empty");
}
}
}
public void Enqueue(object o) {
Entry e = GetFromPool();
if( e == null ) {
e = new Entry();
}
e.Value = o;
e.Next = null;
/* the queue was not empty */
Entry temp_t = null;
object temp_next = null;
bool cont = true;
do {
temp_t = _tail as Entry;
/* Let's try to move the tail */
temp_next = temp_t.Next;
if( temp_next == null ) {
/* try to set temp_t.Next
* if temp_t.Next not null, we should continue.
*/
cont = (Interlocked.CompareExchange(ref temp_t.Next, e, null) != null);
}
else {
/*
* Someone already updated temp_t.Next, let's see if we can
* fix _tail
*/
Interlocked.CompareExchange(ref _tail, temp_next, temp_t);
}
} while(cont);
//Make sure the tail is up to date:
Interlocked.CompareExchange(ref _tail, e, temp_t);
if( Interlocked.Increment(ref _count) == 1 ) {
//We just went from 0 -> 1 elements:
_re.Set();
}
if( EnqueueEvent != null ) {
EnqueueEvent(this, EventArgs.Empty);
}
}
/**
* Get an Entry from the pool so we don't have to do
* a new each time
*/
protected Entry GetFromPool() {
Entry e = null;
do {
e = (Entry) _spare_entry;
if( e == null ) { return null; }
} while( Interlocked.CompareExchange( ref _spare_entry, e.Next, e ) != e );
Interlocked.Decrement(ref _pool_size );
return e;
}
protected void PutToPool(Entry e) {
if ( _pool_size >= MAX_POOL_SIZE ) { return; }
object tmp_e;
do {
tmp_e = _spare_entry;
e.Next = tmp_e;
}
while( Interlocked.CompareExchange( ref _spare_entry, e, tmp_e ) != tmp_e );
Interlocked.Increment(ref _pool_size );
}
#if true
protected const int TEST_RUNS = 500000;
public void TestThread1()
{
//See a random number generator with the number 1.
Random r = new Random(1);
for(int i = 0; i < TEST_RUNS; i++) {
Enqueue( r.Next() );
}
Close();
}
public static void Main()
{
BlockingQueue bq = new BlockingQueue();
Thread t = new Thread(bq.TestThread1);
t.Start();
Random r = new Random(1);
int i = 0;
for(i = 0; i < TEST_RUNS; i++) {
int j = (int)bq.Dequeue();
int ra = r.Next();
if( j != ra ) {
Console.Error.WriteLine("{0} != {1}", j, ra);
}
//Console.WriteLine(i);
}
//The next dequeue should throw an exception
bool got_exception = false;
try {
bq.Dequeue();
}
catch(Exception x) { got_exception = true; }
if( !got_exception ) {
Console.Error.WriteLine("Didn't get exception");
}
}
#endif
#if BRUNET_NUNIT
public void TestThread1()
{
//See a random number generator with the number 1.
Random r = new Random(1);
for(int i = 0; i < 100000; i++) {
Enqueue( r.Next() );
}
Close();
}
[Test]
public void TestThread2()
{
Thread t = new Thread(this.TestThread1);
t.Start();
Random r = new Random(1);
for(int i = 0; i < 100000; i++) {
Assert.AreEqual( Dequeue(), r.Next(), "dequeue equality test" );
}
// System.Console.WriteLine("Trying to get an exception");
//The next dequeue should throw an exception
bool got_exception = false;
try {
Dequeue();
}
catch(Exception) { got_exception = true; }
Assert.IsTrue(got_exception, "got exception");
//Try it again
got_exception = false;
try {
Dequeue();
}
catch(Exception) { got_exception = true; }
Assert.IsTrue(got_exception, "got exception");
}
#endif
}
}
_______________________________________________
Mono-list maillist - [email protected]
http://lists.ximian.com/mailman/listinfo/mono-list