Merge authors:
  Rick Stovall (fpstovall)
Related merge proposals:
  https://code.launchpad.net/~fpstovall/nrtb/circular_queue/+merge/78527
  proposed by: Rick Stovall (fpstovall)
  review: Approve - Aron Boyette (aron-carolina)
  review: Approve - George Jordan (gsjordanc)
  review: Approve - Rick Stovall (fpstovall)
------------------------------------------------------------
revno: 8 [merge]
committer: fpstov...@gmail.com
branch nick: alpha_merge
timestamp: Fri 2011-10-07 20:36:10 -0400
message:
  Merged the circular_queue branch providing the C++ common circular_queue and 
linear_queue classes.
  
  This has been tested and is ready to use in alpha phase.
added:
  common/circular_queue/
  common/circular_queue/Makefile
  common/circular_queue/circular_queue.h
  common/circular_queue/circular_queue_test.cpp
  common/linear_queue/
  common/linear_queue/Makefile
  common/linear_queue/linear_queue.h
  common/linear_queue/linear_queue_test.cpp
modified:
  common/Makefile
  common/threads/base_thread.cpp


--
lp:nrtb
https://code.launchpad.net/~nrtb-core/nrtb/alpha

Your team NRTB Core is subscribed to branch lp:nrtb.
To unsubscribe from this branch go to 
https://code.launchpad.net/~nrtb-core/nrtb/alpha/+edit-subscription
=== modified file 'common/Makefile'
--- common/Makefile	2011-08-15 03:47:28 +0000
+++ common/Makefile	2011-10-07 00:01:15 +0000
@@ -40,6 +40,8 @@
 	@cd point; make ${action}
 	@cd timer; make ${action}
 	@cd threads; make ${action}
+	@cd circular_queue; make ${action}
+	@cd linear_queue; make ${action}
 	@cd sockets; make ${action}
 	@cd serializer; make ${action}
 	@cd singleton; make ${action}

=== added directory 'common/circular_queue'
=== added file 'common/circular_queue/Makefile'
--- common/circular_queue/Makefile	1970-01-01 00:00:00 +0000
+++ common/circular_queue/Makefile	2011-10-06 20:09:07 +0000
@@ -0,0 +1,31 @@
+#***********************************************
+#This file is part of the NRTB project (https://launchpad.net/nrtb).
+#
+#    NRTB is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    NRTB is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with NRTB.  If not, see <http://www.gnu.org/licenses/>.
+#
+#***********************************************
+
+lib:	circular_queue_test
+	@./circular_queue_test
+	@cp -v circular_queue.h ../include
+	@echo build complete
+
+circular_queue_test:	circular_queue.h circular_queue_test.cpp
+	@rm -f circular_queue_test
+	g++ -c circular_queue_test.cpp -I../include
+	g++ -o circular_queue_test circular_queue_test.o ../obj/common.o ../obj/base_thread.o -lpthread 
+
+clean:
+	@rm -rvf *.o circular_queue_test ../include/circular_queue.h *.log ../obj/circular_queue.o
+	@echo all objects and executables have been erased.

=== added file 'common/circular_queue/circular_queue.h'
--- common/circular_queue/circular_queue.h	1970-01-01 00:00:00 +0000
+++ common/circular_queue/circular_queue.h	2011-10-06 20:09:07 +0000
@@ -0,0 +1,174 @@
+/***********************************************
+ This file is part of the NRTB project (https://*launchpad.net/nrtb).
+
+ NRTB is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ NRTB is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with NRTB.  If not, see <http://www.gnu.org/licenses/>.
+
+ **********************************************/
+
+#ifndef nrtb_circular_queue_h
+#define nrtb_circular_queue_h
+
+#include <iostream>
+#include <base_thread.h>
+#include <boost/circular_buffer.hpp>
+
+namespace nrtb
+{
+
+/********************************************************
+ * The circular_queue template is designed for use with
+ * the classic producer/consumer thread management model.
+ * The producer uses circular_queue::push() to put items
+ * in the queue as they become available, and the consumer
+ * thread calls circular_queue::park() when it is ready
+ * for the next item to work.
+ *
+ * Common uses would be for buffering outgoing or incomming
+ * messages from a communications channel, providing a feed
+ * queue for parallel threads to make full use of multi-core
+ * processors, or any case where one or more threads are
+ * passing data to another set of threads.
+********************************************************/
+template <class T>
+class circular_queue
+{
+public:
+    class queue_not_ready: public base_exception {};
+
+    /*********************************************
+      * creates the queue with the specified
+      * number of elements. All memory is allocated
+      * at construction to minimize delays at runtime.
+    *********************************************/
+    circular_queue(int size);
+
+    /*********************************************
+      * releases all items in the queue
+    *********************************************/
+    virtual ~circular_queue();
+
+    /*********************************************
+      * Puts an item in the queue.
+    *********************************************/
+    void push(T item);
+
+    /*********************************************
+      * Pops the next item off the queue, blocking
+      * if needed until an item becomes available.
+    *********************************************/
+    T pop();
+
+    /*********************************************
+     * puts the queue in shutdown mode.
+    *********************************************/
+    void shutdown();
+
+    // returns the number of items in the queue
+    int size();
+    // resizes the buffer, may cause data loss
+    void resize(int newsize);
+    // clears the buffer, data will be discarded.
+    void clear();
+
+protected:
+
+    boost::circular_buffer<T> buffer;
+    cond_variable buffer_lock;
+    bool ready;
+};
+
+template <class T>
+circular_queue<T>::circular_queue(int size)
+{
+    buffer.set_capacity(size);
+    ready = true;
+};
+
+// TODO: needed ... a queue stop method.
+
+template <class T>
+circular_queue<T>::~circular_queue()
+{
+};
+
+template <class T>
+void circular_queue<T>::push(T item)
+{
+  if (ready)
+  {
+	scope_lock lock(buffer_lock);
+    buffer.push_back(item);
+    buffer_lock.signal();
+  }
+  else 
+  {
+	queue_not_ready e;
+	  throw e;
+  }
+};
+
+template <class T>
+T circular_queue<T>::pop()
+{
+    scope_lock lock(buffer_lock);
+    while (buffer.empty() && ready)
+        buffer_lock.wait();
+    if (!ready)
+    {
+        queue_not_ready e;
+        throw e;
+    };
+	T returnme = buffer.front();
+	buffer.pop_front();
+	return returnme;
+};
+
+template <class T>
+void circular_queue<T>::shutdown()
+{
+  try
+  {
+	scope_lock lock(buffer_lock);
+	ready = false;
+	buffer_lock.broadcast_signal();
+	buffer.clear();
+  }
+  catch (...) {}  
+}
+
+
+template <class T>
+int circular_queue<T>::size()
+{
+    scope_lock lock(buffer_lock);
+    return buffer.size();
+};
+
+template <class T>
+void circular_queue<T>::resize(int newsize)
+{
+    scope_lock lock(buffer_lock);
+    buffer.set_capacity(newsize);
+};
+
+template <class T>
+void circular_queue<T>::clear()
+{
+    scope_lock lock(buffer_lock);
+    buffer.clear();
+};
+
+} // namespace nrtb
+
+#endif //nrtb_circular_queue_h//

=== added file 'common/circular_queue/circular_queue_test.cpp'
--- common/circular_queue/circular_queue_test.cpp	1970-01-01 00:00:00 +0000
+++ common/circular_queue/circular_queue_test.cpp	2011-10-06 20:09:07 +0000
@@ -0,0 +1,152 @@
+/***********************************************
+ This file is part of the NRTB project (https://*launchpad.net/nrtb).
+ 
+ NRTB is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+ 
+ NRTB is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with NRTB.  If not, see <http://www.gnu.org/licenses/>.
+ 
+ **********************************************/
+ 
+#include <string>
+#include <iostream>
+#include "circular_queue.h"
+#include <boost/shared_ptr.hpp>
+
+using namespace nrtb;
+using namespace std;
+
+typedef circular_queue<int> test_queue;
+typedef boost::shared_ptr<test_queue> queue_p;
+
+class consumer_task: public thread
+{
+public:
+
+  consumer_task(string n, queue_p buffer)
+  {
+	name = n;
+	input = buffer;
+	count = 0;
+  };
+  
+  ~consumer_task()
+  {
+	cout << ">> in " << name << "::~consumer_task()" << endl;
+	try
+	{
+	  this->thread::~thread();
+	  input.reset();
+	}
+	catch (...)  {};
+	cout << "<< leaving " << name << "::~consumer_task()" << endl;
+  };
+  
+  int get_count() { return count; };
+  
+  void run()
+  {
+	try
+	{
+	  while (true)
+	  {
+		int num = input->pop();
+		{
+		  static mutex console;
+		  scope_lock lock(console);
+		  cout << name << " picked up " << num
+			<< endl;
+		};
+		count++;
+		lastnum = num;
+		yield();	  
+	  }
+	}
+	catch (...) {};
+  };
+
+protected:
+  // link to the feed queue
+  queue_p input;
+  // a name to report 
+  string name;
+  // number of items processed
+  int count;
+  // last number caught
+  int lastnum;
+};
+
+typedef boost::shared_ptr<consumer_task> task_p;
+
+
+int main()
+{
+  int er_count = 0;
+  /************************************************
+   * Load queue and then cook it down...
+   ***********************************************/
+  // make and load a queue
+  queue_p q1(new test_queue(50));
+  for (int i=0; i<100; i++)
+  {
+	q1->push(i);
+  };
+  // the queue should be loaded with 50-99
+  // attach a thread and process it.
+  task_p p1(new consumer_task("task 1",q1));
+  p1->start();
+  while (q1->size()) usleep(100);
+  cout << "cp 1 " << p1->get_count() << endl;
+  /************************************************
+   * now that the preload is exhasted, shove items
+   * in one at a time to make sure each is picked
+   * up correctly.
+   ***********************************************/
+  for (int i=200; i<225; i++)
+  {
+	q1->push(i);
+	usleep(100);
+  };
+  cout << "cp 2 " << p1->get_count() << endl;
+  /************************************************
+   * Last check; attach a second thread to the queue
+   * and make sure both are servicing it.
+   ***********************************************/
+  task_p p2(new consumer_task("task 2",q1));
+  p2->start();
+  for (int i=300; i<325; i++)
+  {
+	q1->push(i);
+  };
+  while (q1->size()) usleep(100);
+  // shut it all down
+  q1->shutdown();
+  p1->join();
+  p2->join();
+  // important numbers
+  int tot_items = p1->get_count() + p2->get_count();
+  int p1_items = p1->get_count() - 75;
+  int p2_items = p2->get_count();
+  // release she threads and queues.
+  p1.reset();
+  p2.reset();
+  q1.reset();
+  // do some reporting.
+  cout << "cp 3 " 
+	<<  tot_items
+	<< " [75 + (" << p1_items
+	<< " + " << p2_items 
+	<< ")]" << endl;
+	bool passed = (tot_items == 100);
+  // inverted logic needed because 0 is good for 
+  // return codes.
+  return !passed; 
+};
\ No newline at end of file

=== added directory 'common/linear_queue'
=== added file 'common/linear_queue/Makefile'
--- common/linear_queue/Makefile	1970-01-01 00:00:00 +0000
+++ common/linear_queue/Makefile	2011-10-07 00:01:15 +0000
@@ -0,0 +1,31 @@
+#***********************************************
+#This file is part of the NRTB project (https://launchpad.net/nrtb).
+#
+#    NRTB is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    NRTB is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with NRTB.  If not, see <http://www.gnu.org/licenses/>.
+#
+#***********************************************
+
+lib:	linear_queue_test
+	@./linear_queue_test
+	@cp -v linear_queue.h ../include
+	@echo build complete
+
+linear_queue_test:	linear_queue.h linear_queue_test.cpp
+	@rm -f linear_queue_test
+	g++ -c linear_queue_test.cpp -I../include
+	g++ -o linear_queue_test linear_queue_test.o ../obj/common.o ../obj/base_thread.o -lpthread 
+
+clean:
+	@rm -rvf *.o linear_queue_test ../include/linear_queue.h *.log ../obj/linear_queue.o
+	@echo all objects and executables have been erased.

=== added file 'common/linear_queue/linear_queue.h'
--- common/linear_queue/linear_queue.h	1970-01-01 00:00:00 +0000
+++ common/linear_queue/linear_queue.h	2011-10-07 00:01:15 +0000
@@ -0,0 +1,165 @@
+/***********************************************
+ This file is part of the NRTB project (https://*launchpad.net/nrtb).
+
+ NRTB is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ NRTB is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with NRTB.  If not, see <http://www.gnu.org/licenses/>.
+
+ **********************************************/
+
+#ifndef nrtb_linear_queue_h
+#define nrtb_linear_queue_h
+
+#include <iostream>
+#include <base_thread.h>
+#include <list>
+
+namespace nrtb
+{
+
+/********************************************************
+ * The linear_queue template is designed for use with
+ * the classic producer/consumer thread management model.
+ * The producer uses linear_queue::push() to put items
+ * in the queue as they become available, and the consumer
+ * thread calls linear_queue::park() when it is ready
+ * for the next item to work.
+ * 
+ * This queue will expand as needed. Constrast this with
+ * the circular_queue, which is of a fixed size and will 
+ * drop older data when full
+ *
+ * Common uses would be for buffering outgoing or incomming
+ * messages from a communications channel, providing a feed
+ * queue for parallel threads to make full use of multi-core
+ * processors, or any case where one or more threads are
+ * passing data to another set of threads.
+********************************************************/
+template <class T>
+class linear_queue
+{
+public:
+    class queue_not_ready: public base_exception {};
+
+    /*********************************************
+      * creates the queue with the specified
+      * number of elements. 
+    *********************************************/
+    linear_queue();
+
+    /*********************************************
+      * releases all items in the queue
+    *********************************************/
+    virtual ~linear_queue();
+
+    /*********************************************
+      * Puts an item in the queue.
+    *********************************************/
+    void push(T item);
+
+    /*********************************************
+      * Pops the next item off the queue, blocking
+      * if needed until an item becomes available.
+    *********************************************/
+    T pop();
+
+    /*********************************************
+     * puts the queue in shutdown mode.
+    *********************************************/
+    void shutdown();
+
+    // returns the number of items in the queue
+    int size();
+    // clears the buffer, data will be discarded.
+    void clear();
+
+protected:
+
+    std::list<T> buffer;
+    cond_variable buffer_lock;
+    bool ready;
+};
+
+template <class T>
+linear_queue<T>::linear_queue()
+{
+    ready = true;
+};
+
+template <class T>
+linear_queue<T>::~linear_queue()
+{
+};
+
+template <class T>
+void linear_queue<T>::push(T item)
+{
+  if (ready)
+  {
+	scope_lock lock(buffer_lock);
+    buffer.push_back(item);
+    buffer_lock.signal();
+  }
+  else 
+  {
+	queue_not_ready e;
+	  throw e;
+  }
+};
+
+template <class T>
+T linear_queue<T>::pop()
+{
+    scope_lock lock(buffer_lock);
+    while (buffer.empty() && ready)
+        buffer_lock.wait();
+    if (!ready)
+    {
+        queue_not_ready e;
+        throw e;
+    };
+	T returnme = buffer.front();
+	buffer.pop_front();
+	return returnme;
+};
+
+template <class T>
+void linear_queue<T>::shutdown()
+{
+  try
+  {
+	scope_lock lock(buffer_lock);
+	ready = false;
+	buffer_lock.broadcast_signal();
+	buffer.clear();
+  }
+  catch (...) {}  
+}
+
+
+template <class T>
+int linear_queue<T>::size()
+{
+    scope_lock lock(buffer_lock);
+    return buffer.size();
+};
+
+template <class T>
+void linear_queue<T>::clear()
+{
+    scope_lock lock(buffer_lock);
+    buffer.clear();
+};
+
+} // namespace nrtb
+
+#endif //nrtb_linear_queue_h//

=== added file 'common/linear_queue/linear_queue_test.cpp'
--- common/linear_queue/linear_queue_test.cpp	1970-01-01 00:00:00 +0000
+++ common/linear_queue/linear_queue_test.cpp	2011-10-07 00:01:15 +0000
@@ -0,0 +1,152 @@
+/***********************************************
+ This file is part of the NRTB project (https://*launchpad.net/nrtb).
+ 
+ NRTB is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+ 
+ NRTB is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with NRTB.  If not, see <http://www.gnu.org/licenses/>.
+ 
+ **********************************************/
+ 
+#include <string>
+#include <iostream>
+#include "linear_queue.h"
+#include <boost/shared_ptr.hpp>
+
+using namespace nrtb;
+using namespace std;
+
+typedef linear_queue<int> test_queue;
+typedef boost::shared_ptr<test_queue> queue_p;
+
+class consumer_task: public thread
+{
+public:
+
+  consumer_task(string n, queue_p buffer)
+  {
+	name = n;
+	input = buffer;
+	count = 0;
+  };
+  
+  ~consumer_task()
+  {
+	cout << ">> in " << name << "::~consumer_task()" << endl;
+	try
+	{
+	  this->thread::~thread();
+	  input.reset();
+	}
+	catch (...)  {};
+	cout << "<< leaving " << name << "::~consumer_task()" << endl;
+  };
+  
+  int get_count() { return count; };
+  
+  void run()
+  {
+	try
+	{
+	  while (true)
+	  {
+		int num = input->pop();
+		{
+		  static mutex console;
+		  scope_lock lock(console);
+		  cout << name << " picked up " << num
+			<< endl;
+		};
+		count++;
+		lastnum = num;
+		yield();	  
+	  }
+	}
+	catch (...) {};
+  };
+
+protected:
+  // link to the feed queue
+  queue_p input;
+  // a name to report 
+  string name;
+  // number of items processed
+  int count;
+  // last number caught
+  int lastnum;
+};
+
+typedef boost::shared_ptr<consumer_task> task_p;
+
+
+int main()
+{
+  int er_count = 0;
+  /************************************************
+   * Load queue and then cook it down...
+   ***********************************************/
+  // make and load a queue
+  queue_p q1(new test_queue());
+  for (int i=0; i<100; i++)
+  {
+	q1->push(i);
+  };
+  // the queue should be loaded with 50-99
+  // attach a thread and process it.
+  task_p p1(new consumer_task("task 1",q1));
+  p1->start();
+  while (q1->size()) usleep(100);
+  cout << "cp 1 " << p1->get_count() << endl;
+  /************************************************
+   * now that the preload is exhasted, shove items
+   * in one at a time to make sure each is picked
+   * up correctly.
+   ***********************************************/
+  for (int i=200; i<225; i++)
+  {
+	q1->push(i);
+	usleep(100);
+  };
+  cout << "cp 2 " << p1->get_count() << endl;
+  /************************************************
+   * Last check; attach a second thread to the queue
+   * and make sure both are servicing it.
+   ***********************************************/
+  task_p p2(new consumer_task("task 2",q1));
+  p2->start();
+  for (int i=300; i<325; i++)
+  {
+	q1->push(i);
+  };
+  while (q1->size()) usleep(100);
+  // shut it all down
+  q1->shutdown();
+  p1->join();
+  p2->join();
+  // important numbers
+  int tot_items = p1->get_count() + p2->get_count();
+  int p1_items = p1->get_count() - 125;
+  int p2_items = p2->get_count();
+  // release she threads and queues.
+  p1.reset();
+  p2.reset();
+  q1.reset();
+  // do some reporting.
+  cout << "cp 3 " 
+	<<  tot_items
+	<< " [125 + (" << p1_items
+	<< " + " << p2_items 
+	<< ")]" << endl;
+	bool passed = (tot_items == 150);
+  // inverted logic needed because 0 is good for 
+  // return codes.
+  return !passed; 
+};
\ No newline at end of file

=== modified file 'common/threads/base_thread.cpp'
--- common/threads/base_thread.cpp	2011-09-15 01:21:11 +0000
+++ common/threads/base_thread.cpp	2011-10-06 20:06:58 +0000
@@ -352,22 +352,15 @@
 		// is there anyone waiting on this cond_variable?
 		if (!try_lock() || (waiting > 0))
 		{	
-			// not good, there are others waiting on us or we are locked.
-			unlock();
 			cerr << "WARNING: there were " << waiting << 
 			  " threads queued in ~cond_variable." << endl; 
-		}
-		else
-		{
-			pthread_cond_destroy(&mycv);
 		};
-		// unlock before we leave.
-		unlock();
 	}
 	catch (...)
 	{
 		cerr << "WARNING: there was an error in ~cond_variable." << endl;;
 	};
+	pthread_cond_destroy(&mycv);
 };
 
 void cond_variable::lock()

_______________________________________________
Mailing list: https://launchpad.net/~nrtb-core
Post to     : nrtb-core@lists.launchpad.net
Unsubscribe : https://launchpad.net/~nrtb-core
More help   : https://help.launchpad.net/ListHelp

Reply via email to