Here you go.

On 6/22/07, Alan McGovern <[EMAIL PROTECTED]> wrote:
Can i see your testcase?
/*
Copyright (C) 2005  P. Oscar Boykin <[EMAIL PROTECTED]>, University of Florida

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
*/

//#define DEBUG

using System.Collections;
using System.Threading;
using System;

namespace Brunet
{

/**
 * This class offers a means to pass objects in a queue
 * between threads (or in the same thread).  The Dequeue
 * method will block until there is something in the Queue
 */
public class BlockingQueue {
 
  public class Entry {
    public object Value;
    public Entry Next;
  }

  public BlockingQueue() {
    _are = new AutoResetEvent(false); 
    _closed = false;
    _sync = new object();
    _head = null;
    _tail = null;
    _count = 0;
    
    _pool = null;
    _pool_count = 0;
  }
 
  protected AutoResetEvent _are;
 
  protected bool _closed;

  protected readonly object _sync;
  protected Entry _head;
  protected Entry _tail;
  protected int _count;

  protected Entry _pool;
  protected int _pool_count;
  protected const int MAX_POOL = 100;

  public bool Closed { get { lock ( _sync ) { return _closed; } } }
  
  /**
   * When an item is enqueued, this event is fire
   */
  public event EventHandler EnqueueEvent;
  
  /* **********************************************
   * Here all the methods
   */
  
  public void Clear() {
    lock( _sync ) {
      _head = null; 
      _tail = null;
      _count = 0;
    }
  }
  
  /**
   * Once this method is called, and the queue is emptied,
   * all future Dequeue's will throw exceptions
   */
  public void Close() {
    lock( _sync ) {
      _closed = true;
      _are.Set();
    }
    //Wake up any blocking threads:
#if DEBUG
    System.Console.WriteLine("Close set");
#endif
  }
  
  /**
   * @throw Exception if the queue is closed
   */
  public object Dequeue() {
    bool timedout = false;
    return Dequeue(-1, out timedout);
  }

  /**
   * @param millisec how many milliseconds to wait if the queue is empty
   * @param timedout true if we have to wait too long to get an object
   * @return the object, if we timeout we return null
   * @throw Exception if the BlockingQueue is closed and empty
   */
  public object Dequeue(int millisec, out bool timedout)
  {
    return Dequeue(millisec, out timedout, true);
  }
  
  /**
   * Handles peeking and dequeueing
   */
  protected object Dequeue(int millisec, out bool timedout, bool advance)
  {
    object val = null;
    bool got_val = false;
    lock( _sync ) {
      if( _head != null ) {
        //Easy
        val = _head.Value;
        got_val = true;
	if( advance ) {
	  Entry tmp_h = _head;
	  _head = _head.Next;
          if( _head == null ) { _tail = null; }
	  _count = _count - 1;
          //Recycle e:
          AddToPool(tmp_h);
        }
      }
      else { 
        //The queue is empty: check to see if it is also closed:
	if( _closed ) {
          timedout = false;
	  throw new InvalidOperationException("BlockingQueue is closed");
	}
        //Make sure we don't have any old signals waiting...
        _are.Reset();
      }
    }
    if( !got_val ) {
      //We have to wait
      bool got_set = _are.WaitOne(millisec, false);
      lock( _sync ) {
        if( got_set ) {
	  if( _closed ) {
            timedout = false;
	    throw new InvalidOperationException("BlockingQueue is closed");
          }
	  //Advance the queue
          val = _head.Value;
          got_val = true;
	  if( advance ) {
            Entry tmp_h = _head;
	    _head = _head.Next;
            if( _head == null ) { _tail = null; }
	    _count = _count - 1;
            //Recycle e:
            AddToPool(tmp_h);
          }
        }
      }
    }
    if( got_val ) {
      timedout = false;
      return val;
    }
    else {
      //We timed out:
      timedout = true;
      return null;
    }
  }
  
  protected void AddToPool(Entry e) {
    if( _pool_count < MAX_POOL ) {
      e.Next = _pool;
      e.Value = null;
      _pool = e;
      _pool_count++;
    }
  }
  protected Entry GetEntry(object v) {
    //Let's see if we can recycle:
    Entry e = _pool;
    if( _pool != null ) {
      _pool = _pool.Next;
      _pool_count--;
    }
    else {
      e = new Entry();
    }
    e.Value = v; 
    e.Next = null;
    return e;
  }
  /**
   * @throw Exception if the queue is closed
   */
  public object Peek() {
    bool timedout = false;
    return Peek(-1, out timedout);
  }

  /**
   * @param millisec how many milliseconds to wait if the queue is empty
   * @param timedout true if we have to wait too long to get an object
   * @return the object, if we timeout we return null
   * @throw Exception if the BlockingQueue is closed and empty
   */
  public object Peek(int millisec, out bool timedout)
  {
    return Dequeue(millisec, out timedout, false);
  }

  public void Enqueue(object v) {
    bool fire = false;
    lock( _sync ) {
      //Don't do anything if the queue is closed
      if( _closed ) { return; }
      Entry e = GetEntry(v); 
      if( _tail != null ) {
        _tail.Next = e;
      }
      _tail = e;
      fire = true;
    //Now we need to fix up the head:
      _count = _count + 1;
      if( _head == null ) {
        //We just added to an empty queue
	_head = _tail;
        _are.Set();
      }
    }
    //After we have alerted any blocking threads (Set), fire
    //the event:
    if( fire && (EnqueueEvent != null) ) {
      EnqueueEvent(this, EventArgs.Empty);
    }
  }

  protected const int TEST_RUNS = 500000;

  public void TestThread1()
  {
    //See a random number generator with the number 1.
    Random r = new Random(1);
    for(int i = 0; i < TEST_RUNS; i++) { 
      Enqueue( r.Next() );
    }
    Close();
  }
  
  public static void Main()
  {
    BlockingQueue bq = new BlockingQueue();
    Thread t = new Thread(bq.TestThread1);
    t.Start();
    Random r = new Random(1);
    for(int i = 0; i < TEST_RUNS; i++) { 
      int j = (int)bq.Dequeue();
      int ra = r.Next();
      if( j != ra ) {
        Console.Error.WriteLine("{0}: {1} != {2}", i, j, ra);
	Console.ReadLine();
      }
    }
    //The next dequeue should throw an exception
    bool got_exception = false;
    try {
      bq.Dequeue();
    }
    catch(Exception x) { got_exception = true; }
    if( !got_exception ) {
      Console.Error.WriteLine("Didn't get exception");
    }
  }
}

}
/*
Copyright (C) 2005-2007  P. Oscar Boykin <[EMAIL PROTECTED]>, University of Florida

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
*/

//#define DEBUG

using System.Collections;
using System.Threading;
using System;
#if BRUNET_NUNIT
using NUnit.Framework;
#endif

namespace Brunet
{

/**
 * This class offers a means to pass objects in a queue
 * between threads (or in the same thread).  The Dequeue
 * method will block until there is something in the Queue
 */
#if BRUNET_NUNIT
[TestFixture]
#endif
public class BlockingQueue {
  
  protected class Entry {
    public object Value;
    public object Next;
  }

  public BlockingQueue() {
    _re = new AutoResetEvent(false); 
    _closed = 0;
    _head = new Entry();
    _tail = _head;
    _count = 0;
  }
 
  protected readonly AutoResetEvent _re;
  protected int _closed;
  protected int _count;
  protected object _head;
  protected object _tail;
  //To hopefully save on garbage collection, we keep a spare entry around
  protected const int MAX_POOL_SIZE = 100;
  protected object _spare_entry;
  protected int _pool_size;

  public bool Closed { get { return (_closed == 1);  } }
  
  public int Count { get { return _count; } }
 
  /**
   * When an item is enqueued, this event is fire
   */
  public event EventHandler EnqueueEvent;
  
  /* **********************************************
   * Here all the methods
   */
 
  /**
   * Once this method is called, and the queue is emptied,
   * all future Dequeue's will throw exceptions
   */
  public void Close() {
    int o_c = Interlocked.CompareExchange(ref _closed, 1, 0);
    if( o_c != _closed ) {
      /*
       * We just transitioned to Closed, set the ARE
       */
      _re.Set();
    }
#if DEBUG
    System.Console.WriteLine("Close set");
#endif
  }
  
  /**
   * @throw Exception if the queue is closed
   */
  public object Dequeue() {
    bool timedout = false;
    return Dequeue(-1, out timedout);
  }

  /**
   * @param millisec how many milliseconds to wait if the queue is empty
   * @param timedout true if we have to wait too long to get an object
   * @return the object, if we timeout we return null
   * @throw Exception if the BlockingQueue is closed and empty
   */
  public object Dequeue(int millisec, out bool timedout)
  {
    bool got_set = _re.WaitOne(millisec, false);
    if( !got_set ) {
      timedout = true;
      return null;
    }
    else {
      timedout = false;
      /* Just look at the head */
      Entry temp_h = null;
      object temp_next = null;
      
      /* Loop until we either hit an exception, or get a successful move of
       * head
       */
      do {
        temp_h = _head as Entry;
        temp_next = temp_h.Next;
        if( temp_next == null ) {
          //Looks like the queue is empty:
          if( Closed ) {
            //Make sure the next guy can move through:
            _re.Set();
          }
          else { Console.WriteLine("head.Next == null, but not closed"); }
          throw new InvalidOperationException("BlockingQueue is empty");
        }
      } while( Interlocked.CompareExchange(ref _head, temp_next, temp_h) != temp_h );
      /* Now we have temp_h which was the old head */
      if( Interlocked.Decrement( ref _count ) > 0 || Closed ) {
        //There are more to get, let the next guy through:
        _re.Set();
      }
      //Save the old temp_h to possibly reuse
      PutToPool(temp_h);   
      return ((Entry)temp_next).Value; 
    }     
  }
  
  /**
   * @throw Exception if the queue is closed
   */
  public object Peek() {
    bool timedout = false;
    return Peek(-1, out timedout);
  }

  /**
   * @param millisec how many milliseconds to wait if the queue is empty
   * @param timedout true if we have to wait too long to get an object
   * @return the object, if we timeout we return null
   * @throw Exception if the BlockingQueue is closed and empty
   */
  public object Peek(int millisec, out bool timedout)
  {
    object val = null;
    bool got_set = _re.WaitOne(millisec, false);
    if( !got_set ) {
      timedout = true;
      return null;
    }
    else {
      timedout = false;
      /* Just look at the head */
      Entry temp_h = _head as Entry;
      /* we are not pulling anything out, so reset the wait handle */
      _re.Set();
      if( temp_h.Next != null ) {
        return ((Entry)temp_h.Next).Value;
      }
      else {
        throw new InvalidOperationException("BlockingQueue is empty");
      }
    }
  }

  public void Enqueue(object o) {
    Entry e = GetFromPool();
    if( e == null ) {
      e = new Entry();
    }
    e.Value = o;
    e.Next = null;
    /* the queue was not empty */
    Entry temp_t = null;
    object temp_next = null;
    bool cont = true;
    do {
      temp_t = _tail as Entry;
      /* Let's try to move the tail */
      temp_next = temp_t.Next;
      if( temp_next == null ) {
        /* try to set temp_t.Next 
         * if temp_t.Next not null, we should continue.
         */ 
        cont = (Interlocked.CompareExchange(ref temp_t.Next, e, null) != null);
      }
      else {
        /* 
         * Someone already updated temp_t.Next, let's see if we can
         * fix _tail
         */
        Interlocked.CompareExchange(ref _tail, temp_next, temp_t);
      }
    } while(cont);
    //Make sure the tail is up to date:
    Interlocked.CompareExchange(ref _tail, e, temp_t);
    if( Interlocked.Increment(ref _count) == 1 ) {
      //We just went from 0 -> 1 elements:
      _re.Set();
    }
    if( EnqueueEvent != null ) {
      EnqueueEvent(this, EventArgs.Empty);
    }
  }

  /**
   * Get an Entry from the pool so we don't have to do
   * a new each time
   */
  protected Entry GetFromPool() {
    Entry e = null;
    do {
      e = (Entry) _spare_entry;
      if( e == null ) { return null; }
    } while( Interlocked.CompareExchange( ref _spare_entry, e.Next, e ) != e );
    Interlocked.Decrement(ref _pool_size );
    return e;
  }
  protected void PutToPool(Entry e) {
    if ( _pool_size >= MAX_POOL_SIZE ) { return; }
    object tmp_e;
    do {
      tmp_e = _spare_entry;
      e.Next = tmp_e;
    }
    while( Interlocked.CompareExchange( ref _spare_entry, e, tmp_e ) != tmp_e );
    Interlocked.Increment(ref _pool_size );
  }

#if true
  protected const int TEST_RUNS = 500000;

  public void TestThread1()
  {
    //See a random number generator with the number 1.
    Random r = new Random(1);
    for(int i = 0; i < TEST_RUNS; i++) { 
      Enqueue( r.Next() );
    }
    Close();
  }
  
  public static void Main()
  {
    BlockingQueue bq = new BlockingQueue();
    Thread t = new Thread(bq.TestThread1);
    t.Start();
    Random r = new Random(1);
    int i = 0;
    for(i = 0; i < TEST_RUNS; i++) { 
      int j = (int)bq.Dequeue();
      int ra = r.Next();
      if( j != ra ) {
        Console.Error.WriteLine("{0} != {1}", j, ra);
      }
      //Console.WriteLine(i);
    }
    //The next dequeue should throw an exception
    bool got_exception = false;
    try {
      bq.Dequeue();
    }
    catch(Exception x) { got_exception = true; }
    if( !got_exception ) {
      Console.Error.WriteLine("Didn't get exception");
    }
  }

#endif

#if BRUNET_NUNIT
  public void TestThread1()
  {
    //See a random number generator with the number 1.
    Random r = new Random(1);
    for(int i = 0; i < 100000; i++) { 
      Enqueue( r.Next() );
    }
    Close();
  }
  
  [Test]
  public void TestThread2()
  {
    Thread t = new Thread(this.TestThread1);
    t.Start();
    Random r = new Random(1);
    for(int i = 0; i < 100000; i++) { 
      Assert.AreEqual( Dequeue(), r.Next(), "dequeue equality test" );
    }
//    System.Console.WriteLine("Trying to get an exception");
    //The next dequeue should throw an exception
    bool got_exception = false;
    try {
      Dequeue();
    }
    catch(Exception) { got_exception = true; }
    Assert.IsTrue(got_exception, "got exception");
    //Try it again
    got_exception = false;
    try {
      Dequeue();
    }
    catch(Exception) { got_exception = true; }
    Assert.IsTrue(got_exception, "got exception");
  }
#endif
}

}
_______________________________________________
Mono-list maillist  -  [email protected]
http://lists.ximian.com/mailman/listinfo/mono-list

Reply via email to