Author: gsim
Date: Thu Jan 31 10:50:46 2008
New Revision: 617188

URL: http://svn.apache.org/viewvc?rev=617188&view=rev
Log:
Make ports accesible through socket interface.
Add local port to each logged frame in the client Connector


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=617188&r1=617187&r2=617188&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Thu Jan 31 
10:50:46 2008
@@ -29,12 +29,15 @@
 #include "qpid/sys/Poller.h"
 #include "qpid/Msg.h"
 #include <boost/bind.hpp>
+#include <boost/format.hpp>
 
 namespace qpid {
 namespace client {
 
 using namespace qpid::sys;
 using namespace qpid::framing;
+using boost::format;
+using boost::str;
 
 Connector::Connector(
     ProtocolVersion ver, bool _debug, uint32_t buffer_size
@@ -59,7 +62,7 @@
     Mutex::ScopedLock l(closedLock);
     assert(closed);
     socket.connect(host, port);
-    identifier=socket.getPeerAddress();
+    identifier=str(format("[%1% %2%]") % socket.getLocalPort() % 
socket.getPeerAddress());
     closed = false;
     poller = Poller::shared_ptr(new Poller);
     aio = new AsynchIO(socket,
@@ -175,7 +178,9 @@
     ~Buff() { delete [] bytes;}
 };
 
-Connector::Writer::Writer() : aio(0), buffer(0), lastEof(0) {}
+Connector::Writer::Writer() : aio(0), buffer(0), lastEof(0) 
+{
+}
 
 Connector::Writer::~Writer() { delete buffer; }
 
@@ -183,6 +188,7 @@
     Mutex::ScopedLock l(lock);
     aio = a;
     newBuffer(l);
+    identifier = str(format("[%1% %2%]") % aio->getSocket().getLocalPort() % 
aio->getSocket().getPeerAddress());
 }
 
 void Connector::Writer::handle(framing::AMQFrame& frame) { 
@@ -192,7 +198,7 @@
         lastEof = frames.size();
         aio->notifyPendingWrite();
     }
-    QPID_LOG(trace, "SENT (" << aio->getSocket().getPeerAddress() << "): " << 
frame);
+    QPID_LOG(trace, "SENT " << identifier << ": " << frame);
 }
 
 void Connector::Writer::writeOne(const Mutex::ScopedLock& l) {
@@ -235,7 +241,7 @@
 
     AMQFrame frame;
     while(frame.decode(in)){
-        QPID_LOG(trace, "RECV (" << identifier << "): " << frame);
+        QPID_LOG(trace, "RECV " << identifier << ": " << frame);
         input->received(frame);
     }
     // TODO: unreading needs to go away, and when we can cope

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=617188&r1=617187&r2=617188&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Thu Jan 31 
10:50:46 2008
@@ -59,6 +59,7 @@
         size_t lastEof; // Position after last EOF in frames
         framing::Buffer encode;
         size_t framesEncoded;
+        std::string identifier;
         
         void writeOne(const sys::Mutex::ScopedLock&);
         void newBuffer(const sys::Mutex::ScopedLock&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h?rev=617188&r1=617187&r2=617188&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h Thu Jan 31 10:50:46 2008
@@ -87,7 +87,16 @@
      * socket
      */
     std::string getPeerAddress() const;
+    /** 
+     * Returns an address (host and port) for the local end of the
+     * socket
+     */
+    std::string getLocalAddress() const;
 
+    uint getLocalPort() const;
+    uint getRemotePort() const;
+
+    
     /** Accept a connection from a socket that is already listening
      * and has an incoming connection
      */

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=617188&r1=617187&r2=617188&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp Thu Jan 31 
10:50:46 2008
@@ -30,6 +30,7 @@
 #include <sys/errno.h>
 #include <netinet/in.h>
 #include <netdb.h>
+#include <cstdlib>
 
 #include <boost/format.hpp>
 
@@ -45,6 +46,7 @@
     int fd;
 
     std::string getName(bool local, bool includeService = false) const;   
+    std::string getService(bool local) const;   
 };
 
 std::string SocketPrivate::getName(bool local, bool includeService) const
@@ -77,6 +79,28 @@
     }
 }
 
+std::string SocketPrivate::getService(bool local) const
+{
+    ::sockaddr_storage name; // big enough for any socket address    
+    ::socklen_t namelen = sizeof(name);
+    
+    int result = -1;
+    if (local) {
+        result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
+    } else {
+        result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
+    }
+
+    QPID_POSIX_CHECK(result);
+
+    char servName[NI_MAXSERV];
+    if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0, 
+                                 servName, sizeof(servName), 
+                                 NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+        throw QPID_POSIX_ERROR(rc);
+    return servName;
+}
+
 Socket::Socket() :
        impl(new SocketPrivate)
 {
@@ -229,6 +253,21 @@
 std::string Socket::getPeerAddress() const
 {
     return impl->getName(false, true);
+}
+
+std::string Socket::getLocalAddress() const
+{
+    return impl->getName(true, true);
+}
+
+uint Socket::getLocalPort() const
+{
+    return atoi(impl->getService(true).c_str());
+}
+
+uint Socket::getRemotePort() const
+{
+    return atoi(impl->getService(true).c_str());
 }
 
 int Socket::toFd() const {


Reply via email to