Merge authors: Rick Stovall (fpstovall) Related merge proposals: https://code.launchpad.net/~fpstovall/nrtb/circular_queue/+merge/78527 proposed by: Rick Stovall (fpstovall) review: Approve - Aron Boyette (aron-carolina) review: Approve - George Jordan (gsjordanc) review: Approve - Rick Stovall (fpstovall) ------------------------------------------------------------ revno: 8 [merge] committer: fpstov...@gmail.com branch nick: alpha_merge timestamp: Fri 2011-10-07 20:36:10 -0400 message: Merged the circular_queue branch providing the C++ common circular_queue and linear_queue classes. This has been tested and is ready to use in alpha phase. added: common/circular_queue/ common/circular_queue/Makefile common/circular_queue/circular_queue.h common/circular_queue/circular_queue_test.cpp common/linear_queue/ common/linear_queue/Makefile common/linear_queue/linear_queue.h common/linear_queue/linear_queue_test.cpp modified: common/Makefile common/threads/base_thread.cpp
-- lp:nrtb https://code.launchpad.net/~nrtb-core/nrtb/alpha Your team NRTB Core is subscribed to branch lp:nrtb. To unsubscribe from this branch go to https://code.launchpad.net/~nrtb-core/nrtb/alpha/+edit-subscription
=== modified file 'common/Makefile' --- common/Makefile 2011-08-15 03:47:28 +0000 +++ common/Makefile 2011-10-07 00:01:15 +0000 @@ -40,6 +40,8 @@ @cd point; make ${action} @cd timer; make ${action} @cd threads; make ${action} + @cd circular_queue; make ${action} + @cd linear_queue; make ${action} @cd sockets; make ${action} @cd serializer; make ${action} @cd singleton; make ${action} === added directory 'common/circular_queue' === added file 'common/circular_queue/Makefile' --- common/circular_queue/Makefile 1970-01-01 00:00:00 +0000 +++ common/circular_queue/Makefile 2011-10-06 20:09:07 +0000 @@ -0,0 +1,31 @@ +#*********************************************** +#This file is part of the NRTB project (https://launchpad.net/nrtb). +# +# NRTB is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# NRTB is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with NRTB. If not, see <http://www.gnu.org/licenses/>. +# +#*********************************************** + +lib: circular_queue_test + @./circular_queue_test + @cp -v circular_queue.h ../include + @echo build complete + +circular_queue_test: circular_queue.h circular_queue_test.cpp + @rm -f circular_queue_test + g++ -c circular_queue_test.cpp -I../include + g++ -o circular_queue_test circular_queue_test.o ../obj/common.o ../obj/base_thread.o -lpthread + +clean: + @rm -rvf *.o circular_queue_test ../include/circular_queue.h *.log ../obj/circular_queue.o + @echo all objects and executables have been erased. === added file 'common/circular_queue/circular_queue.h' --- common/circular_queue/circular_queue.h 1970-01-01 00:00:00 +0000 +++ common/circular_queue/circular_queue.h 2011-10-06 20:09:07 +0000 @@ -0,0 +1,174 @@ +/*********************************************** + This file is part of the NRTB project (https://*launchpad.net/nrtb). + + NRTB is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + NRTB is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with NRTB. If not, see <http://www.gnu.org/licenses/>. + + **********************************************/ + +#ifndef nrtb_circular_queue_h +#define nrtb_circular_queue_h + +#include <iostream> +#include <base_thread.h> +#include <boost/circular_buffer.hpp> + +namespace nrtb +{ + +/******************************************************** + * The circular_queue template is designed for use with + * the classic producer/consumer thread management model. + * The producer uses circular_queue::push() to put items + * in the queue as they become available, and the consumer + * thread calls circular_queue::park() when it is ready + * for the next item to work. + * + * Common uses would be for buffering outgoing or incomming + * messages from a communications channel, providing a feed + * queue for parallel threads to make full use of multi-core + * processors, or any case where one or more threads are + * passing data to another set of threads. +********************************************************/ +template <class T> +class circular_queue +{ +public: + class queue_not_ready: public base_exception {}; + + /********************************************* + * creates the queue with the specified + * number of elements. All memory is allocated + * at construction to minimize delays at runtime. + *********************************************/ + circular_queue(int size); + + /********************************************* + * releases all items in the queue + *********************************************/ + virtual ~circular_queue(); + + /********************************************* + * Puts an item in the queue. + *********************************************/ + void push(T item); + + /********************************************* + * Pops the next item off the queue, blocking + * if needed until an item becomes available. + *********************************************/ + T pop(); + + /********************************************* + * puts the queue in shutdown mode. + *********************************************/ + void shutdown(); + + // returns the number of items in the queue + int size(); + // resizes the buffer, may cause data loss + void resize(int newsize); + // clears the buffer, data will be discarded. + void clear(); + +protected: + + boost::circular_buffer<T> buffer; + cond_variable buffer_lock; + bool ready; +}; + +template <class T> +circular_queue<T>::circular_queue(int size) +{ + buffer.set_capacity(size); + ready = true; +}; + +// TODO: needed ... a queue stop method. + +template <class T> +circular_queue<T>::~circular_queue() +{ +}; + +template <class T> +void circular_queue<T>::push(T item) +{ + if (ready) + { + scope_lock lock(buffer_lock); + buffer.push_back(item); + buffer_lock.signal(); + } + else + { + queue_not_ready e; + throw e; + } +}; + +template <class T> +T circular_queue<T>::pop() +{ + scope_lock lock(buffer_lock); + while (buffer.empty() && ready) + buffer_lock.wait(); + if (!ready) + { + queue_not_ready e; + throw e; + }; + T returnme = buffer.front(); + buffer.pop_front(); + return returnme; +}; + +template <class T> +void circular_queue<T>::shutdown() +{ + try + { + scope_lock lock(buffer_lock); + ready = false; + buffer_lock.broadcast_signal(); + buffer.clear(); + } + catch (...) {} +} + + +template <class T> +int circular_queue<T>::size() +{ + scope_lock lock(buffer_lock); + return buffer.size(); +}; + +template <class T> +void circular_queue<T>::resize(int newsize) +{ + scope_lock lock(buffer_lock); + buffer.set_capacity(newsize); +}; + +template <class T> +void circular_queue<T>::clear() +{ + scope_lock lock(buffer_lock); + buffer.clear(); +}; + +} // namespace nrtb + +#endif //nrtb_circular_queue_h// === added file 'common/circular_queue/circular_queue_test.cpp' --- common/circular_queue/circular_queue_test.cpp 1970-01-01 00:00:00 +0000 +++ common/circular_queue/circular_queue_test.cpp 2011-10-06 20:09:07 +0000 @@ -0,0 +1,152 @@ +/*********************************************** + This file is part of the NRTB project (https://*launchpad.net/nrtb). + + NRTB is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + NRTB is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with NRTB. If not, see <http://www.gnu.org/licenses/>. + + **********************************************/ + +#include <string> +#include <iostream> +#include "circular_queue.h" +#include <boost/shared_ptr.hpp> + +using namespace nrtb; +using namespace std; + +typedef circular_queue<int> test_queue; +typedef boost::shared_ptr<test_queue> queue_p; + +class consumer_task: public thread +{ +public: + + consumer_task(string n, queue_p buffer) + { + name = n; + input = buffer; + count = 0; + }; + + ~consumer_task() + { + cout << ">> in " << name << "::~consumer_task()" << endl; + try + { + this->thread::~thread(); + input.reset(); + } + catch (...) {}; + cout << "<< leaving " << name << "::~consumer_task()" << endl; + }; + + int get_count() { return count; }; + + void run() + { + try + { + while (true) + { + int num = input->pop(); + { + static mutex console; + scope_lock lock(console); + cout << name << " picked up " << num + << endl; + }; + count++; + lastnum = num; + yield(); + } + } + catch (...) {}; + }; + +protected: + // link to the feed queue + queue_p input; + // a name to report + string name; + // number of items processed + int count; + // last number caught + int lastnum; +}; + +typedef boost::shared_ptr<consumer_task> task_p; + + +int main() +{ + int er_count = 0; + /************************************************ + * Load queue and then cook it down... + ***********************************************/ + // make and load a queue + queue_p q1(new test_queue(50)); + for (int i=0; i<100; i++) + { + q1->push(i); + }; + // the queue should be loaded with 50-99 + // attach a thread and process it. + task_p p1(new consumer_task("task 1",q1)); + p1->start(); + while (q1->size()) usleep(100); + cout << "cp 1 " << p1->get_count() << endl; + /************************************************ + * now that the preload is exhasted, shove items + * in one at a time to make sure each is picked + * up correctly. + ***********************************************/ + for (int i=200; i<225; i++) + { + q1->push(i); + usleep(100); + }; + cout << "cp 2 " << p1->get_count() << endl; + /************************************************ + * Last check; attach a second thread to the queue + * and make sure both are servicing it. + ***********************************************/ + task_p p2(new consumer_task("task 2",q1)); + p2->start(); + for (int i=300; i<325; i++) + { + q1->push(i); + }; + while (q1->size()) usleep(100); + // shut it all down + q1->shutdown(); + p1->join(); + p2->join(); + // important numbers + int tot_items = p1->get_count() + p2->get_count(); + int p1_items = p1->get_count() - 75; + int p2_items = p2->get_count(); + // release she threads and queues. + p1.reset(); + p2.reset(); + q1.reset(); + // do some reporting. + cout << "cp 3 " + << tot_items + << " [75 + (" << p1_items + << " + " << p2_items + << ")]" << endl; + bool passed = (tot_items == 100); + // inverted logic needed because 0 is good for + // return codes. + return !passed; +}; \ No newline at end of file === added directory 'common/linear_queue' === added file 'common/linear_queue/Makefile' --- common/linear_queue/Makefile 1970-01-01 00:00:00 +0000 +++ common/linear_queue/Makefile 2011-10-07 00:01:15 +0000 @@ -0,0 +1,31 @@ +#*********************************************** +#This file is part of the NRTB project (https://launchpad.net/nrtb). +# +# NRTB is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# NRTB is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with NRTB. If not, see <http://www.gnu.org/licenses/>. +# +#*********************************************** + +lib: linear_queue_test + @./linear_queue_test + @cp -v linear_queue.h ../include + @echo build complete + +linear_queue_test: linear_queue.h linear_queue_test.cpp + @rm -f linear_queue_test + g++ -c linear_queue_test.cpp -I../include + g++ -o linear_queue_test linear_queue_test.o ../obj/common.o ../obj/base_thread.o -lpthread + +clean: + @rm -rvf *.o linear_queue_test ../include/linear_queue.h *.log ../obj/linear_queue.o + @echo all objects and executables have been erased. === added file 'common/linear_queue/linear_queue.h' --- common/linear_queue/linear_queue.h 1970-01-01 00:00:00 +0000 +++ common/linear_queue/linear_queue.h 2011-10-07 00:01:15 +0000 @@ -0,0 +1,165 @@ +/*********************************************** + This file is part of the NRTB project (https://*launchpad.net/nrtb). + + NRTB is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + NRTB is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with NRTB. If not, see <http://www.gnu.org/licenses/>. + + **********************************************/ + +#ifndef nrtb_linear_queue_h +#define nrtb_linear_queue_h + +#include <iostream> +#include <base_thread.h> +#include <list> + +namespace nrtb +{ + +/******************************************************** + * The linear_queue template is designed for use with + * the classic producer/consumer thread management model. + * The producer uses linear_queue::push() to put items + * in the queue as they become available, and the consumer + * thread calls linear_queue::park() when it is ready + * for the next item to work. + * + * This queue will expand as needed. Constrast this with + * the circular_queue, which is of a fixed size and will + * drop older data when full + * + * Common uses would be for buffering outgoing or incomming + * messages from a communications channel, providing a feed + * queue for parallel threads to make full use of multi-core + * processors, or any case where one or more threads are + * passing data to another set of threads. +********************************************************/ +template <class T> +class linear_queue +{ +public: + class queue_not_ready: public base_exception {}; + + /********************************************* + * creates the queue with the specified + * number of elements. + *********************************************/ + linear_queue(); + + /********************************************* + * releases all items in the queue + *********************************************/ + virtual ~linear_queue(); + + /********************************************* + * Puts an item in the queue. + *********************************************/ + void push(T item); + + /********************************************* + * Pops the next item off the queue, blocking + * if needed until an item becomes available. + *********************************************/ + T pop(); + + /********************************************* + * puts the queue in shutdown mode. + *********************************************/ + void shutdown(); + + // returns the number of items in the queue + int size(); + // clears the buffer, data will be discarded. + void clear(); + +protected: + + std::list<T> buffer; + cond_variable buffer_lock; + bool ready; +}; + +template <class T> +linear_queue<T>::linear_queue() +{ + ready = true; +}; + +template <class T> +linear_queue<T>::~linear_queue() +{ +}; + +template <class T> +void linear_queue<T>::push(T item) +{ + if (ready) + { + scope_lock lock(buffer_lock); + buffer.push_back(item); + buffer_lock.signal(); + } + else + { + queue_not_ready e; + throw e; + } +}; + +template <class T> +T linear_queue<T>::pop() +{ + scope_lock lock(buffer_lock); + while (buffer.empty() && ready) + buffer_lock.wait(); + if (!ready) + { + queue_not_ready e; + throw e; + }; + T returnme = buffer.front(); + buffer.pop_front(); + return returnme; +}; + +template <class T> +void linear_queue<T>::shutdown() +{ + try + { + scope_lock lock(buffer_lock); + ready = false; + buffer_lock.broadcast_signal(); + buffer.clear(); + } + catch (...) {} +} + + +template <class T> +int linear_queue<T>::size() +{ + scope_lock lock(buffer_lock); + return buffer.size(); +}; + +template <class T> +void linear_queue<T>::clear() +{ + scope_lock lock(buffer_lock); + buffer.clear(); +}; + +} // namespace nrtb + +#endif //nrtb_linear_queue_h// === added file 'common/linear_queue/linear_queue_test.cpp' --- common/linear_queue/linear_queue_test.cpp 1970-01-01 00:00:00 +0000 +++ common/linear_queue/linear_queue_test.cpp 2011-10-07 00:01:15 +0000 @@ -0,0 +1,152 @@ +/*********************************************** + This file is part of the NRTB project (https://*launchpad.net/nrtb). + + NRTB is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + NRTB is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with NRTB. If not, see <http://www.gnu.org/licenses/>. + + **********************************************/ + +#include <string> +#include <iostream> +#include "linear_queue.h" +#include <boost/shared_ptr.hpp> + +using namespace nrtb; +using namespace std; + +typedef linear_queue<int> test_queue; +typedef boost::shared_ptr<test_queue> queue_p; + +class consumer_task: public thread +{ +public: + + consumer_task(string n, queue_p buffer) + { + name = n; + input = buffer; + count = 0; + }; + + ~consumer_task() + { + cout << ">> in " << name << "::~consumer_task()" << endl; + try + { + this->thread::~thread(); + input.reset(); + } + catch (...) {}; + cout << "<< leaving " << name << "::~consumer_task()" << endl; + }; + + int get_count() { return count; }; + + void run() + { + try + { + while (true) + { + int num = input->pop(); + { + static mutex console; + scope_lock lock(console); + cout << name << " picked up " << num + << endl; + }; + count++; + lastnum = num; + yield(); + } + } + catch (...) {}; + }; + +protected: + // link to the feed queue + queue_p input; + // a name to report + string name; + // number of items processed + int count; + // last number caught + int lastnum; +}; + +typedef boost::shared_ptr<consumer_task> task_p; + + +int main() +{ + int er_count = 0; + /************************************************ + * Load queue and then cook it down... + ***********************************************/ + // make and load a queue + queue_p q1(new test_queue()); + for (int i=0; i<100; i++) + { + q1->push(i); + }; + // the queue should be loaded with 50-99 + // attach a thread and process it. + task_p p1(new consumer_task("task 1",q1)); + p1->start(); + while (q1->size()) usleep(100); + cout << "cp 1 " << p1->get_count() << endl; + /************************************************ + * now that the preload is exhasted, shove items + * in one at a time to make sure each is picked + * up correctly. + ***********************************************/ + for (int i=200; i<225; i++) + { + q1->push(i); + usleep(100); + }; + cout << "cp 2 " << p1->get_count() << endl; + /************************************************ + * Last check; attach a second thread to the queue + * and make sure both are servicing it. + ***********************************************/ + task_p p2(new consumer_task("task 2",q1)); + p2->start(); + for (int i=300; i<325; i++) + { + q1->push(i); + }; + while (q1->size()) usleep(100); + // shut it all down + q1->shutdown(); + p1->join(); + p2->join(); + // important numbers + int tot_items = p1->get_count() + p2->get_count(); + int p1_items = p1->get_count() - 125; + int p2_items = p2->get_count(); + // release she threads and queues. + p1.reset(); + p2.reset(); + q1.reset(); + // do some reporting. + cout << "cp 3 " + << tot_items + << " [125 + (" << p1_items + << " + " << p2_items + << ")]" << endl; + bool passed = (tot_items == 150); + // inverted logic needed because 0 is good for + // return codes. + return !passed; +}; \ No newline at end of file === modified file 'common/threads/base_thread.cpp' --- common/threads/base_thread.cpp 2011-09-15 01:21:11 +0000 +++ common/threads/base_thread.cpp 2011-10-06 20:06:58 +0000 @@ -352,22 +352,15 @@ // is there anyone waiting on this cond_variable? if (!try_lock() || (waiting > 0)) { - // not good, there are others waiting on us or we are locked. - unlock(); cerr << "WARNING: there were " << waiting << " threads queued in ~cond_variable." << endl; - } - else - { - pthread_cond_destroy(&mycv); }; - // unlock before we leave. - unlock(); } catch (...) { cerr << "WARNING: there was an error in ~cond_variable." << endl;; }; + pthread_cond_destroy(&mycv); }; void cond_variable::lock()
_______________________________________________ Mailing list: https://launchpad.net/~nrtb-core Post to : nrtb-core@lists.launchpad.net Unsubscribe : https://launchpad.net/~nrtb-core More help : https://help.launchpad.net/ListHelp