Rick Stovall has proposed merging lp:~fpstovall/nrtb/unique_ptr_fix into 
lp:nrtb.

Requested reviews:
  NRTB Core (nrtb-core): code

For more details, see:
https://code.launchpad.net/~fpstovall/nrtb/unique_ptr_fix/+merge/100340

First of the changes to bring the code up to C+11 spec, particularly focused on 
replacing boost::shared_ptr with std::unique_ptr where appropriate.

This merge includes significant updates to the sockets and transceiver libs, 
improving stability, reporting and enforcing the logical constraint that a 
socket can have only one owner and that code which accepts a connection from 
the tcp_server socket factory must take actual ownership of the new socket. 
Fixes were also added which allowed the removal of arbitrary time delays in the 
associated unit test programs.

This will be the first of several merges assuming the move to C++11 is 
approved. 
-- 
https://code.launchpad.net/~fpstovall/nrtb/unique_ptr_fix/+merge/100340
Your team NRTB Core is requested to review the proposed merge of 
lp:~fpstovall/nrtb/unique_ptr_fix into lp:nrtb.
=== modified file 'common/sockets/Makefile'
--- common/sockets/Makefile	2011-09-17 01:21:36 +0000
+++ common/sockets/Makefile	2012-04-01 15:57:19 +0000
@@ -24,13 +24,13 @@
 
 socket_test:	base_socket.o socket_test.cpp
 	@rm -f socket_test
-	g++ -c -O3 socket_test.cpp -I ../include
-	g++ -o socket_test socket_test.o base_socket.o ../obj/hires_timer.o ../obj/common.o ../obj/base_thread.o -lpthread 
+	g++ -c -O3 socket_test.cpp -I ../include -std=gnu++0x 
+	g++ -o socket_test socket_test.o base_socket.o ../obj/hires_timer.o ../obj/common.o ../obj/base_thread.o -lpthread  -std=gnu++0x
 
 
 base_socket.o:	base_socket.cpp base_socket.h Makefile
 	@rm -f base_socket.o
-	g++ -c -O3 base_socket.cpp -I ../include
+	g++ -c -O3 base_socket.cpp -I ../include -std=gnu++0x 
 
 clean:
 	@rm -vf *.o ../include/base_socket.h socket_test

=== modified file 'common/sockets/base_socket.cpp'
--- common/sockets/base_socket.cpp	2011-09-17 01:21:36 +0000
+++ common/sockets/base_socket.cpp	2012-04-01 15:57:19 +0000
@@ -605,32 +605,21 @@
   // take action only if the listen thread is running.
   if (listening())
   {
-	// stop the listener thread
-	if (is_running()) stop();
-	// wait here until the thread stops.
-	if (is_running()) join();
-//	try
-//	{ 
-//	  if (listen_sock) close(listen_sock);
-//	}
-//	catch (...) {};
+    // stop the listener thread
+    stop();
+    join();
   };
 };
 
 bool tcp_server_socket_factory::listening()
 {
-	bool running = is_running();
-/*	if (!running)
-	{
-		// check to be sure the thread did not die due to an error.
-		if (_last_thread_fault != 0)
-		{
-		  // if thread_return was non-zero, it is assumed the thread died an
-		  // evil and useless death. Scream in anger!
-		  throw listen_terminated_exception();
-		};
-	};
-*/	return running;
+  bool running = is_running();
+  return running;
+};
+
+int tcp_server_socket_factory::last_fault()
+{
+  return _last_thread_fault;
 };
 
 unsigned short int tcp_server_socket_factory::backlog()
@@ -643,108 +632,104 @@
 {
   std::cerr << "in thread cleanup sock closer" << std::endl;
   int & socket = *(static_cast<int*>(sock));
-  ::close(socket);
+  try { ::close(socket); } catch  (...) {};
   std::cerr << "socker closer done." << std::endl;
 };
 
 void tcp_server_socket_factory::run()
 {
-  /* Put this entire thing in a try block to protect the application. 
-	* Without this, an untrapped exception thrown here or in the 
-	* user supplied on_accept() method would abort the entire 
-	* application instead of just this
-	* thread.
-	*/
+  // set up the listening socket.
   int listen_sock;
-  // make sure the listener is closed when we exit.
-  pthread_cleanup_push(closeme, (void*) &listen_sock);
+  _last_thread_fault = 0;
+  bool go = false;
   try
   {
-	bool go = true;
-	// set up our listening socket.
-	listen_sock = socket(AF_INET,SOCK_STREAM,0);
-	sockaddr_in myaddr;
-	try
-	{
-		myaddr = tcp_socket::str_to_sockaddr(_address);
-	}
-	catch (...)
-	{
-		// probably a tcp_socket::bad_address_exception, 
-		// but any reason will do.
-		go = false;
-	};
-	if (bind(listen_sock,(sockaddr *) &myaddr,sizeof(myaddr)))
-	{
-		// bind did not work.
-		go = false;
-	};	
-	if (listen(listen_sock,_backlog))
-	{
-		// listen failed in some way.. I don't care which.
-		go = false;
-	};
-	// processing loop
-	while (go)
-	{
-	  // accept a new connection
-	  bool good_connect = true;
-	  int new_conn = accept(listen_sock,NULL,NULL);
-	  // validate the accept return value.
-	  if (new_conn == -1)
-	  {
-		// accept returned an error.
-		switch (errno) 
-		{
-//			case ENETDOWN :
-			case EPROTO :
-//			case ENOPROTOOPT :
-			case EHOSTDOWN :
-//			case ENONET :
-			case EHOSTUNREACH :
-//			case EOPNOTSUPP :
-//			case ENETUNREACH :
-			case EAGAIN :
-//			case EPERM :
-			case ECONNABORTED :
-				{
-				  good_connect = false;
-				  break;
-				};
-			default : 
-				{	
-				  // for any other error, we're going to shutdown the 
-				  // this listener thread.
-				  go = false;
-				  good_connect = false;
-				  _last_thread_fault = errno;
-				  break;
-				};
-		};  // switch (errno)
-	  }; // error thrown by accept.
-	  if (good_connect)
-	  {
-		// create connect_sock
-		connect_sock.reset(new tcp_socket(new_conn));
-		// make the thread easily cancelable.
-		set_cancel_anytime();
-		// call on_accept
-		go = on_accept();
-		// set back to cancel_deferred.
-		set_deferred_cancel();
-		// release our claim to the new socket
-		connect_sock.reset();
-	  };
-	}; // while go;
+    listen_sock = socket(AF_INET,SOCK_STREAM,0);
+    sockaddr_in myaddr;
+    myaddr = tcp_socket::str_to_sockaddr(_address);
+    int a = bind(listen_sock,(sockaddr *) &myaddr,sizeof(myaddr));
+    int b = listen(listen_sock,_backlog);
+    if (a || b)
+    {
+      go = false;
+      if (a) _last_thread_fault += 1;
+      if (b) _last_thread_fault += 2;
+    }
+    else go = true;
   }
   catch (...)
   {
-	/* an untrapped exception was thrown by someone in this thread.
-	* We'll shutdown this thread and put -1 in the thread_return field
-	* to let the world know that we don't know what killed us.
-	*/
-	_last_thread_fault = -1;
-  };
+    _last_thread_fault = 100;
+  };
+  // if not in a good state, terminate the thread.
+  if (!go)
+  {
+    _last_thread_fault =+ 200;
+    exit(0);
+  };
+  // make sure the listener is closed when we exit.
+  pthread_cleanup_push(closeme, (void*) &listen_sock);
+  // processing loop
+  while (go)
+  {
+    // accept a new connection
+    bool good_connect = true;
+    int new_conn = accept(listen_sock,NULL,NULL);
+    // validate the accept return value.
+    if (new_conn == -1)
+    {
+      // accept returned an error.
+      switch (errno)
+      {
+        case EPROTO :
+        case EHOSTDOWN :
+        case EHOSTUNREACH :
+        case EAGAIN :
+        case ECONNABORTED :
+        {
+          // abandon this connection
+          good_connect = false;
+          break;
+        };
+        default :
+        {
+          // for any other error, we're going to shutdown the
+          // this listener thread.
+          go = false;
+          good_connect = false;
+          _last_thread_fault = errno;
+          break;
+        };
+      };  // switch (errno)
+    }; // error thrown by accept.
+    if (good_connect)
+    {
+      connect_sock.reset(new tcp_socket(new_conn));
+      set_cancel_anytime();
+      // call the connection handler.
+      try
+      {
+        go = on_accept();
+      }
+      catch (...)
+      {
+        go = false;
+        _last_thread_fault = 501;
+      };
+      set_deferred_cancel();
+      // safety check.
+      if (connect_sock)
+      {
+        std::cerr << "WARNING: on_accept() did not take ownership of "
+          << "connect_sock.\n"
+          << "  This can lead to leaks and should be fixed."
+          << std::endl;
+        connect_sock.reset();
+        _last_thread_fault = 500;
+        go = false;
+      };
+    };
+  }; // while go;
   pthread_cleanup_pop(0);
 };
 

=== modified file 'common/sockets/base_socket.h'
--- common/sockets/base_socket.h	2011-09-17 00:33:23 +0000
+++ common/sockets/base_socket.h	2012-04-01 15:57:19 +0000
@@ -20,7 +20,7 @@
 #define base_socket_header
 
 #include <base_thread.h>
-#include <boost/shared_ptr.hpp>
+#include <memory>
 #include <sys/socket.h>
 #include <netinet/in.h>
 
@@ -373,7 +373,7 @@
 };
 
 /// smart pointer for use with tcp_sockets
-typedef boost::shared_ptr<nrtb::tcp_socket> tcp_socket_p;
+typedef std::unique_ptr<nrtb::tcp_socket> tcp_socket_p;
 
 /** Abstract "listener" TCP/IP socket for servers. 
  ** 

=== modified file 'common/sockets/socket_test.cpp'
--- common/sockets/socket_test.cpp	2011-09-17 01:21:36 +0000
+++ common/sockets/socket_test.cpp	2012-04-01 15:57:19 +0000
@@ -21,7 +21,6 @@
 #include <string>
 #include <boost/random.hpp>
 #include "base_socket.h"
-#include <boost/shared_ptr.hpp>
 
 using namespace nrtb;
 using namespace std;
@@ -29,46 +28,48 @@
 class myserver: public tcp_server_socket_factory
 {
 public:
-	int hits;
-	int errors;
+  int hits;
+  int errors;
 
-	// constructor
-	myserver(const string & a, const unsigned short int & b) 
-		: tcp_server_socket_factory(a,b)
-	{
-		// Don't need to lock here because we know the 
-		// listener thread is not running.
-		hits = 0;
-		errors = 0;
-	};
+  // constructor
+  myserver(const string & a, const unsigned short int & b)
+    : tcp_server_socket_factory(a,b)
+  {
+    // Don't need to lock here because we know the
+    // listener thread is not running.
+    hits = 0;
+    errors = 0;
+  };
 
 protected:
-	// on_accept() is called on each connection.
-	bool on_accept()
-	{
-		try
-		{
-			// just return what we've recieved.
-			string msg = connect_sock->getln();
-			connect_sock->put(msg);
-			// Update our hit count. 
-			hits++;
-		}
-		catch (base_exception & e)
-		{
-		  errors++;
-		  cerr << "server Caught " << e.what() << endl;
-		}
-		catch (...)
-		{
-		  errors++;
-		  cerr << "Unexpected error in on_accept()" << endl;
-		};
-		if (hits > 99) 
-		  return false;
-		else
-		  return true;
-	};
+
+  // on_accept() is called on each connection.
+  bool on_accept()
+  {
+    try
+    {
+      tcp_socket_p sock = std::move(connect_sock);
+      // just return what we've recieved.
+      string msg = sock->getln();
+      sock->put(msg);
+      // Update our hit count.
+      hits++;
+    }
+    catch (base_exception & e)
+    {
+      errors++;
+      cerr << "server Caught " << e.what() << endl;
+    }
+    catch (...)
+    {
+      errors++;
+      cerr << "Unexpected error in on_accept()" << endl;
+    };
+    if (hits > 99)
+      return false;
+    else
+      return true;
+  };
 };
 
 string transceiver(const string address, const string sendme)
@@ -78,7 +79,7 @@
   sender.connect(address);
   sender.put(sendme);
   returnme = sender.getln();
-  sender.close();//cerr << "tc>> sock closed" << endl;
+  sender.close();
   return returnme;
 };
 
@@ -101,86 +102,103 @@
 
   try
   {
-	// start the receiver/server
-	test_server.start_listen();
-	usleep(5e5);
-	
-	// Send test messages
-	for (int i = 0; i < 100; i++)
-	{
-	  stringstream msg;
-	  msg << "test message " << i << "\r";
-	  string checkme = msg.str();
-	  string returned = transceiver(address, checkme);
-	  if (returned != checkme)
-	  {
-		er_count++;
-	  };
-	  cout << returned.substr(0,returned.size()-1) << ": " 
-		<< ((returned == checkme) ? "Passed" : "Failed")
-		<< endl;
-	};
+    // start the receiver/server
+    test_server.start_listen();
+    int countdown = 99;
+    while ((!test_server.listening()) and countdown)
+    {
+      usleep(1e3);
+      countdown++;
+    };
+    if (!countdown)
+    {
+      cerr << "Could not start listener." << endl;
+      exit(1);
+    }
+    cout << "test_server ready." << endl;
+
+    // Send test messages
+    for (int i = 0; i < 100; i++)
+    {
+      stringstream msg;
+      msg << "test message " << i << "\r";
+      string checkme = msg.str();
+      string returned = transceiver(address, checkme);
+      if (returned != checkme)
+      {
+        er_count++;
+      };
+      cout << returned.substr(0,returned.size()-1) << ": "
+      << ((returned == checkme) ? "Passed" : "Failed")
+      << endl;
+    };
   }
   catch (myserver::bind_failure_exception)
   {
-	  cout << "Could not bind port" << endl;
+    cout << "Could not bind port" << endl;
   }
   catch (myserver::mem_exhasted_exception)
   {
-	  cout << "myserver reports out of memory." << endl;
+    cout << "myserver reports out of memory." << endl;
   }
   catch (myserver::listen_terminated_exception)
   {
-	  cout << "Listener terminated unexpectedly." << endl;
+    cout << "Listener terminated unexpectedly." << endl;
   }
   catch (myserver::on_accept_bound_exception)
   {
-	  cout << "myserver::on_accept() seems bound." << endl;
+    cout << "myserver::on_accept() seems bound." << endl;
   }
   catch (tcp_socket::bad_connect_exception & e)
   {
-	  cout << "A bad_connect_exception was thrown.\n" 
-		<< e.comment() << endl;
+    cout << "A bad_connect_exception was thrown.\n"
+      << "  comment: " << e.comment() << endl;
+    cout << "  test_server.last_fault() = "
+      << test_server.last_fault() << endl;
   }
   catch (tcp_socket::not_open_exception & e)
   {
-	  cout << "A tcp not open exception was caught.\n" 
-		<< e.comment() << endl;
+    cout << "A tcp not open exception was caught.\n"
+      << e.comment() << endl;
   }
   catch (tcp_socket::close_exception & e)
   {
-	  cout << "A close_exception was caught.\n" 
-		<< e.comment() << endl;
+    cout << "A close_exception was caught.\n"
+      << e.comment() << endl;
   }
   catch (tcp_socket::overrun_exception & e)
   {
-	  cout << "An overrun_exception was caught.\n" 
-		<< e.comment() << endl;
+    cout << "An overrun_exception was caught.\n"
+      << e.comment() << endl;
   }
   catch (tcp_socket::buffer_full_exception & e)
   {
-	  cout << "A buffer_full_exception was caught.\n" 
-		<< e.comment() << endl;
+    cout << "A buffer_full_exception was caught.\n"
+      << e.comment() << endl;
   }
   catch (tcp_socket::general_exception & e)
   {
-	  cout << "A tcp_socket exception was caught.\n" 
-		<< e.comment() << endl;
+    cout << "A tcp_socket exception was caught.\n"
+      << "  comment: " << e.comment() << endl;
+    cout << "  test_server.last_fault() = "
+      << test_server.last_fault() << endl;
   }
   catch (exception & e)
   {
-	  cout << "A unexpected " << e.what() << " exception was caught." << endl;
+    cout << "A unexpected " << e.what()
+      << " exception was caught." << endl;
   };
 
   // final check.
   if (test_server.hits != 100)
   {
-	er_count++;
-	cout << "Server does not report the proper number of hits.\n"
-	  << "\tExpected 100, got " << test_server.hits 
-	  << endl;
+    er_count++;
+    cout << "Server does not report the proper number of hits.\n"
+      << "\tExpected 100, got " << test_server.hits
+      << endl;
   };
-  cout << "=========== tcp_socket test complete =============" << endl;
+  cout << "=========== tcp_socket test complete ============="
+    << endl;
   
   return er_count;
 };

=== modified file 'common/transceiver/Makefile'
--- common/transceiver/Makefile	2011-08-25 04:22:52 +0000
+++ common/transceiver/Makefile	2012-04-01 15:57:19 +0000
@@ -23,8 +23,8 @@
 
 transceiver_test:	transceiver.h transceiver_test.cpp
 	@rm -f transceiver_test
-	g++ -c transceiver_test.cpp -I../include
-	g++ -o transceiver_test transceiver_test.o ../obj/common.o ../obj/log_setup.o ../obj/serializer.o ../obj/base_thread.o ../obj/base_socket.o ../obj/confreader.o -lpthread -lprotobuf  ../lib/nrtb_gpb.a -lPocoFoundation
+	g++ -c transceiver_test.cpp -I../include -std=gnu++0x
+	g++ -o transceiver_test transceiver_test.o ../obj/common.o ../obj/log_setup.o ../obj/serializer.o ../obj/base_thread.o ../obj/base_socket.o ../obj/confreader.o -lpthread -lprotobuf  ../lib/nrtb_gpb.a -lPocoFoundation -std=gnu++0x
 
 clean:
 	@rm -rvf *.o transceiver_test ../include/transceiver.h *.log ../obj/transceiver.o

=== modified file 'common/transceiver/transceiver.h'
--- common/transceiver/transceiver.h	2011-09-15 22:53:06 +0000
+++ common/transceiver/transceiver.h	2012-04-01 15:57:19 +0000
@@ -26,7 +26,6 @@
 #include <serializer.h>
 #include <confreader.h>
 #include <Poco/Logger.h>
-#include <boost/shared_ptr.hpp>
 #include <boost/circular_buffer.hpp>
 
 namespace nrtb
@@ -51,8 +50,8 @@
    * specification this class implements.
    * ***************************************************************/
   template <class out, class in,
-	class outp = boost::shared_ptr<out>,
-	class inp = boost::shared_ptr<in> >
+	class outp = std::unique_ptr<out>,
+	class inp = std::unique_ptr<in> >
   class transceiver
   {
 	public:
@@ -70,12 +69,17 @@
 	   * socket. Once created this class assumes it uniquely owns the 
 	   * socket and will close it upon distruction.
 	   * ***********************************************************/
-	  transceiver(tcp_socket_p socket);
+	  transceiver(tcp_socket_p & socket);
 	  /*************************************************************
 	   * Closes the socket and releases all mmemory associated with
 	   * this class.
 	   * ***********************************************************/
 	  virtual ~transceiver();
+	  /*************************************************************
+	   * is_connected() returns true if the socket is up and ready
+	   * to use, false otherwise.
+	   *************************************************************/
+	   bool is_connected();
 	  /**************************************************************
 	   * gets the next message from the socket. If no messages are 
 	   * ready, blocks util one arrives. 
@@ -85,7 +89,7 @@
 	   * Sends a message over the socket and adds it to the 
 	   * sent_messages buffer in case it's needed for error recovery.
 	   * ***********************************************************/
-	  void send(outp sendme);
+	  void send(outp & sendme);
 	  /**************************************************************
 	   * Called by the data consumer when an inbound message was 
 	   * not valid in the current application context. msg_number
@@ -124,7 +128,7 @@
 	  unsigned long long  last_inbound;
 	  /// buffer to hold previously sent messages; required for 
 	  /// error recovery.
-	  boost::circular_buffer<outp> sent_messages;
+	  boost::circular_buffer<out> sent_messages;
 	  /// fence post for recovery efforts, zero if none in play
 	  unsigned long long nak_fence_post;
 	  /// These methods implment actual nak recovery.
@@ -136,7 +140,7 @@
 serializer tscvr_sequence(0);
 
 template <class out, class in, class outp, class inp>
-transceiver<out,in,outp,inp>::transceiver(tcp_socket_p socket)
+transceiver<out,in,outp,inp>::transceiver(tcp_socket_p & socket)
 {
   // get the configuration parameters.
   global_conf_reader & config = global_conf_reader::get_instance();
@@ -152,7 +156,7 @@
   logname = s.str();
   Poco::Logger & log = Poco::Logger::get(logname);
   // set up the socket.
-  sock = socket;
+  sock = std::move(socket);
   // annouce ourselves...
   log.information("Instanciated."); 
   s.str("");
@@ -173,10 +177,10 @@
   // shutdown and release  the socket.
   try 
   {
-	if (sock)
-	{
-	  sock.reset(); 
-	};
+	  if (sock)
+	  {
+	    sock.reset(); 
+	  };
   } catch (...) {};
   // discard the sent messages list.
   sent_messages.clear();
@@ -184,65 +188,69 @@
 };
 
 template <class out, class in, class outp, class inp>
+bool transceiver<out,in,outp,inp>::is_connected()
+{
+  bool returnme = false;
+  if (sock and (sock->status() == tcp_socket::sock_connect))
+  {
+    returnme = true;
+  };
+  return returnme;
+};
+
+template <class out, class in, class outp, class inp>
 inp transceiver<out,in,outp,inp>::get()
 {
   // get the message length first.
   std::string len_field = sock->get(4,10);
-//std::cout << "len_field=" << http_chartohex(len_field) << std::endl;  
   msg_num_t msg_len;
   for (int i=0; i<4; i++)
   {
-	msg_len.bytes[i] = len_field[i];
+  	msg_len.bytes[i] = len_field[i];
   };
-//std::cout << ":len=" << msg_len.number << std::endl;
   // get the rest of the message.
   inp returnme(new in);
   std::string input = sock->get(msg_len.number);
-//std::cout << ":received=" << http_chartohex(input) << std::endl;
   returnme->ParseFromString(input);
   // for the first messsge any number is
   // accepted.
   if (last_inbound == 0)
   {
-	last_inbound = returnme->msg_uid();
+  	last_inbound = returnme->msg_uid();
   }
   else
   {
-	last_inbound++;
-	int temp = returnme->msg_uid();
-	if (temp != last_inbound)
-	{ 
-	  inbound_seq_error e;
-	  std::stringstream message;
-	  message << "Expected " << last_inbound
-		<< " received "  << temp;
-	  e.store(message.str());
-	  throw e;
-	};
+	  last_inbound++;
+	  int temp = returnme->msg_uid();
+	  if (temp != last_inbound)
+	  { 
+	    inbound_seq_error e;
+	    std::stringstream message;
+	    message << "Expected " << last_inbound
+		  << " received "  << temp;
+	    e.store(message.str());
+	    throw e;
+	  };
   };
   return returnme;
 };
 
 template <class out, class in, class outp, class inp>
-void transceiver<out,in,outp,inp>::send(outp sendme)
+void transceiver<out,in,outp,inp>::send(outp & sendme)
 {
   sendme->set_msg_uid(out_msg_num());
   std::string output;
   output = sendme->SerializeAsString();
   msg_num_t msg_len;
   msg_len.number = output.size();
-//std::cout << "num:len" << msg_len.number << ":" << output.length() << //std::endl;
   std::string num_field = "    ";
   for (int i=0; i<4; i++)
   {
-	num_field[i] = msg_len.bytes[i];
-//std::cout << int(num_field[i]) << "," ;
+	  num_field[i] = msg_len.bytes[i];
   };
-//std::cout << " = " << msg_len.number << std::endl;  
   output = num_field + output;
-//std::cout << "out msg=" << http_chartohex(output) << std::endl;  
   sock->put(output);
-  sent_messages.push_back(sendme);
+  sent_messages.push_back(*sendme);
 };
 
 template <class out, class in, class outp, class inp>
@@ -279,4 +287,4 @@
 
 } // namespace nrtb
  
-#endif //nrtb_transceiver_h//
\ No newline at end of file
+#endif //nrtb_transceiver_h//

=== modified file 'common/transceiver/transceiver_test.cpp'
--- common/transceiver/transceiver_test.cpp	2011-09-17 01:08:29 +0000
+++ common/transceiver/transceiver_test.cpp	2012-04-01 15:57:19 +0000
@@ -45,14 +45,14 @@
 
   void inc()
   {
-	scope_lock lock(data_lock);
-	er_count++;
+	  scope_lock lock(data_lock);
+	  er_count++;
   };
 
   int operator ()()
   {
-	scope_lock lock(data_lock);
-	return er_count;
+	  scope_lock lock(data_lock);
+	  return er_count;
   };
 };
 
@@ -64,90 +64,124 @@
   
   tcp_socket_p sock;
   unsigned long long last_inbound;
+
+  server_work_thread()
+  {
+    cout << "Creating server_work_thread." << endl;
+    last_inbound = 0;
+  }
   
   ~server_work_thread()
   {
-	cout << "Destructing server_work_thread" << endl;
-	sock.reset();
+	  cout << "Destructing server_work_thread" << endl;
   };
   
   void run()
   {
-	set_cancel_anytime();
-	linkt link(sock);
-	while (sock->status() == tcp_socket::sock_connect)
-	{
-	  try 
-	  {
-		linkt::out_ptr inbound = link.get();
-		last_inbound = inbound->msg_uid();
-		cout << "\tReceived #" << last_inbound << endl;
-		link.send(inbound);
-		if (last_inbound == 99)
-		{
-		  cout << "Receiver thread closing." << endl;
-		  exit(0);
-		};
-	  }
-	  catch (linkt::general_exception & e)
-	  {
-		cerr << "Server work thread caught " << e.what()
-		  << "\n\tComment: " << e.comment() << endl;
-		er_count.inc();;
-	  }
-	  catch (tcp_socket::general_exception & e)
-	  {
-		cerr << "Server work thread caught " << e.what()
-		  << "\n\tComment: " << e.comment() << endl;
-		er_count.inc();;
-	  }
-	  catch (std::exception & e)
-	  {
-		cerr << "Server work thread caught " << e.what() 
-		  << endl;
-		er_count.inc();;
+	  set_cancel_anytime();
+	  linkt link(sock);
+	  while (link.is_connected())
+	  {
+	    try 
+	    {
+		    linkt::in_ptr inbound = link.get();
+		    last_inbound = inbound->msg_uid();
+		    link.send(inbound);
+		    if (last_inbound == 99)
+		    {
+		      cout << "Receiver thread closing." << endl;
+		      exit(0);
+		    };
+      }
+      catch (linkt::general_exception & e)
+      {
+	      cerr << "Server work thread caught " << e.what()
+	        << "\n\tComment: " << e.comment() << endl;
+	      er_count.inc();;
+      }
+      catch (tcp_socket::general_exception & e)
+      {
+	      cerr << "Server work thread caught " << e.what()
+	        << "\n\tComment: " << e.comment() << endl;
+	      er_count.inc();;
+      }
+      catch (std::exception & e)
+      {
+	      cerr << "Server work thread caught " << e.what() 
+	        << endl;
+	      er_count.inc();;
+	    };
 	  };
-	};
   };
 };
 
 class listener: public tcp_server_socket_factory
 {
-protected:
-  boost::shared_ptr<server_work_thread> task;
-
 public:
+
+  std::unique_ptr<server_work_thread> task;
+
   listener(const string & add, const int & back)
-   : tcp_server_socket_factory(add, back) {};
+   : tcp_server_socket_factory(add, back)
+  {
+    cout << "Listener constructed." << endl;
+  };
+   
   ~listener()
   {
-	cout << "Destructing listener" << endl;
-	task.reset();
+	  cout << "Destructing listener" << endl;
+	  // check to see if the listener is still up.
+	  try
+	  {
+	    if (listening())
+      {
+        cerr << "  Listener is still running...";
+        stop_listen();
+        cerr << " shutdown is complete." << endl;
+      };
+	  }
+	  catch (...)
+	  {
+	    cerr << "  Presuming listener is down." << endl;
+	  };
+    if (!task)
+    {
+      cerr << "  Task is not allocated. " << endl;
+    }
+	  // check to see if task is still running and display
+	  // a warning if it is.
+	  if (task and (task->is_running()))
+	  {
+	    cerr << "WARNING: Worker is still running!!" << endl;
+	    task->stop();
+	    task->join();
+	    cerr << "Worker thread shutdown is complete." << endl;
+	  };
   };
   
   bool on_accept()
   {
-	if (!task)
-	{
-	  task.reset(new server_work_thread);
-	  task->last_inbound = 0;
-	  task->sock = connect_sock;
-	  task->start(*(task.get()));
-	  cout << "server thread running." << endl;
-	  // shutdown the listener thead.. our work is done here.
-	  return false;
-	}
-	else
-	{
-	  connect_sock->close();
-	  cerr << "Multiple attempts to connect to server" 
-		<< endl;
-	};
+	  if (!task)
+	  {
+      cout << "In listener::on_accept()" << endl;
+      task.reset(new server_work_thread);
+      task->sock = std::move(connect_sock);
+      task->start();
+      cout << "server thread started." << endl;
+	    // shutdown the listener thead.. our work is done here.
+	    return false;
+	  }
+	  else
+	  {
+	    connect_sock->close();
+	    cerr << "Multiple attempts to connect to server" 
+		  << endl;
+	  };
   };
 };
 
 string address = "127.0.0.1:";
-int port_base = 12334;
+int port_base = 14334;
 
 int main()
 {
@@ -155,51 +189,77 @@
 
   try
   {
-	//set up our port and address
-	boost::mt19937 rng;
-	rng.seed(time(0));
-	boost::uniform_int<> r(0,1000);
-	stringstream s;
-	s << address << port_base + r(rng);
-	address = s.str();
-	cout << "Using " << address << endl;
-
-	// kick off the listener thread.
-	listener server(address,5);
-	server.start_listen();
-	usleep(1e4);
-
-	// set up our sender
-	tcp_socket_p sock(new tcp_socket);
-	sock->connect(address);
-	linkt sender(sock);
-
-	// Let's send a few things.
-	for (int i=0; i<100; i++)
-	{
-	  linkt::out_ptr msg(new my_msg);
-	  sender.send(msg);
-	  cout << "Sent " << msg->msg_uid() << endl;
-	  msg = sender.get();
-	  cout << "Got back " << msg->msg_uid() << endl;
-	};
-	usleep(1e4);
+    //set up our port and address
+    boost::mt19937 rng;
+    rng.seed(time(0));
+    boost::uniform_int<> r(0,1000);
+    stringstream s;
+    s << address << port_base + r(rng);
+    address = s.str();
+    cout << "Using " << address << endl;
+
+    // kick off the listener thread.
+    listener server(address,5);
+    server.start_listen();
+    while (!server.listening())
+    {
+      usleep(1e3);
+    };
+    cout << "Listener thread is ready." << endl;
+
+    // set up our sender
+    tcp_socket_p sock(new tcp_socket);
+    int trycount = 0;
+    while (sock->status() != tcp_socket::sock_connect)
+    {
+      try
+      {
+        sock->connect(address);
+      }
+      catch (tcp_socket::bad_connect_exception & e)
+      {
+        trycount++;
+        if (trycount > 99)
+        {
+          cerr << "Too many connect failures for the sender socket."
+            << endl;
+          throw e;
+        };  
+        usleep(1e4);
+      };
+    }
+    cout << "sender socket is connected to listener" << endl;
+    linkt sender(sock);
+    cout << "Sender transciever is ready to use." << endl;
+
+    // Let's send a few things.
+    for (int i=0; i<100; i++)
+    {
+      linkt::out_ptr msg(new my_msg);
+      sender.send(msg);
+      cout << "Sent " << msg->msg_uid() << ", ";
+      msg = sender.get();
+      cout << "good return." << endl;
+    };
+    usleep(1e4);
   }
   catch (...)
   {
-	cout << "exception caught during test." << endl;
-	er_count.inc();
+	  cout << "exception caught during test." << endl;
+	  er_count.inc();
   };
 
   int faults = er_count(); 
   if (faults)
   {
-	cout << "========== ** There were " << faults 
-	  << "errors logged. =========" << endl; 
+	  cout << "========== ** There were " << faults
+	    << " errors logged. =========" << endl; 
   }
   else
-	cout << "========= nrtb::transceiver test complete.=========" 
-	  << endl;
+  {
+	  cout << "========= nrtb::transceiver test complete.=========" 
+	    << endl;
+  };
 
   return faults;
-};
\ No newline at end of file
+};

_______________________________________________
Mailing list: https://launchpad.net/~nrtb-core
Post to     : nrtb-core@lists.launchpad.net
Unsubscribe : https://launchpad.net/~nrtb-core
More help   : https://help.launchpad.net/ListHelp

Reply via email to