Author: aconway
Date: Wed Nov  8 17:29:59 2006
New Revision: 472732

URL: http://svn.apache.org/viewvc?view=rev&rev=472732
Log:
More separation of concerns with APR, client side complete.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Socket.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
      - copied, changed from r472545, 
incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
      - copied, changed from r472545, 
incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Connector.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h   (with props)
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Connector.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Monitor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Connector.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Monitor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h
    incubator/qpid/trunk/qpid/cpp/test/client/topic_publisher.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Monitor.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Monitor.h?view=diff&rev=472732&r1=472731&r2=472732
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Monitor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Monitor.h Wed Nov  8 17:29:59 
2006
@@ -23,6 +23,7 @@
 #include "apr-1/apr_thread_mutex.h"
 #include "apr-1/apr_thread_cond.h"
 #include "APRBase.h"
+#include "APRPool.h"
 
 namespace qpid {
 namespace sys {
@@ -43,11 +44,11 @@
   public:
     typedef ScopedLock<Mutex> ScopedLock;
     
-    Mutex();
-    ~Mutex();
-    void lock() { CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); }
-    void unlock() { CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); }
-    void trylock() { CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); }
+    inline Mutex();
+    inline ~Mutex();
+    inline void lock();
+    inline void unlock();
+    inline void trylock();
 
   protected:
     apr_thread_mutex_t* mutex;
@@ -57,16 +58,64 @@
 class Monitor : public Mutex
 {
   public:
-    Monitor();
-    ~Monitor();
-    void wait();
-    void wait(int64_t nsecs);
-    void notify();
-    void notifyAll();
+    inline Monitor();
+    inline ~Monitor();
+    inline void wait();
+    inline void wait(int64_t nsecs);
+    inline void notify();
+    inline void notifyAll();
 
   private:
     apr_thread_cond_t* condition;
 };
+
+
+
+Mutex::Mutex() {
+    CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, 
APRPool::get()));
+}
+
+Mutex::~Mutex(){
+    CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+}
+
+void Mutex::lock() {
+    CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+}
+void Mutex::unlock() {
+    CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+}
+
+void Mutex::trylock() {
+    CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex));
+}
+
+Monitor::Monitor() {
+    CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
+}
+
+Monitor::~Monitor() {
+    CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
+}
+
+void Monitor::wait() {
+    CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
+}
+
+void Monitor::wait(int64_t nsecs){
+    // APR uses microseconds.
+    apr_status_t status = apr_thread_cond_timedwait(
+        condition, mutex, nsecs/1000);
+    if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status);
+}
+
+void Monitor::notify(){
+    CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
+}
+
+void Monitor::notifyAll(){
+    CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
+}
 
 
 }}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Socket.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Socket.h?view=auto&rev=472732
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Socket.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Socket.h Wed Nov  8 17:29:59 2006
@@ -0,0 +1,107 @@
+#ifndef _apr_Socket_h
+#define _apr_Socket_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <apr-1/apr_network_io.h>
+#include "APRBase.h"
+#include "APRPool.h"
+
+namespace qpid {
+namespace sys {
+
+class Socket
+{
+  public:
+    inline Socket();
+    inline ~Socket();
+    inline void setTimeout(long msecs);
+    inline void connect(const std::string& host, int port);
+    inline void close();
+
+    enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 };
+
+    inline ssize_t send(const char* data, size_t size);
+    inline ssize_t recv(char* data, size_t size);
+
+  private:
+    apr_socket_t* socket;
+};
+
+inline
+Socket::Socket()
+{
+    CHECK_APR_SUCCESS(
+        apr_socket_create(
+            &socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
+            APRPool::get()));
+}
+
+inline
+Socket::~Socket() { }
+
+inline void
+Socket::setTimeout(long msecs)
+{
+    apr_socket_timeout_set(socket, msecs*1000);
+}
+
+inline void
+Socket::connect(const std::string& host, int port)
+{
+    apr_sockaddr_t* address;
+    CHECK_APR_SUCCESS(
+        apr_sockaddr_info_get(
+            &address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK,
+            APRPool::get()));
+    CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
+}
+
+inline void
+Socket::close()
+{
+    CHECK_APR_SUCCESS(apr_socket_close(socket));
+    socket = 0;
+}
+
+inline ssize_t
+Socket::send(const char* data, size_t size)
+{
+    apr_size_t sent = size;
+    apr_status_t status = apr_socket_send(socket, data, &sent);
+    if (!APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT;
+    if (!APR_STATUS_IS_EOF(status)) return SOCKET_EOF;
+    CHECK_APR_SUCCESS(status);
+    return sent;
+}
+
+inline ssize_t
+Socket::recv(char* data, size_t size)
+{
+    apr_size_t received = size;
+    apr_status_t status = apr_socket_recv(socket, data, &received);
+    if (!APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT;
+    CHECK_APR_SUCCESS(status);
+     return received;
+}
+
+}}
+
+
+#endif  /*!_apr_Socket_h*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Socket.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?view=diff&rev=472732&r1=472731&r2=472732
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Wed Nov  8 
17:29:59 2006
@@ -22,7 +22,7 @@
 #define _Connection_
 
 #include "qpid/QpidError.h"
-#include "qpid/sys/Connector.h"
+#include "qpid/client/Connector.h"
 #include "qpid/sys/ShutdownHandler.h"
 #include "qpid/sys/TimeoutHandler.h"
 
@@ -52,7 +52,7 @@
        int port;
        const u_int32_t max_frame_size;
        std::map<int, Channel*> channels; 
-       qpid::sys::Connector* connector;
+       Connector* connector;
        qpid::framing::OutputHandler* out;
        ResponseHandler responses;
         volatile bool closed;

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (from 
r472545, incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Connector.cpp)
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?view=diff&rev=472732&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Connector.cpp&r1=472545&p2=incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp&r2=472732
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Wed Nov  8 
17:29:59 2006
@@ -17,11 +17,11 @@
  */
 #include <iostream>
 #include <qpid/QpidError.h>
-#include "APRBase.h"
+#include <qpid/sys/Time.h>
 #include "Connector.h"
 
 using namespace qpid::sys;
-using namespace qpid::sys;
+using namespace qpid::client;
 using namespace qpid::framing;
 using qpid::QpidError;
 
@@ -36,24 +36,12 @@
     timeoutHandler(0),
     shutdownHandler(0),
     inbuf(receive_buffer_size), 
-    outbuf(send_buffer_size){
-
-    APRBase::increment();
-
-    CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
-    CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, 
APR_PROTO_TCP, pool));
-}
+    outbuf(send_buffer_size){ }
 
-Connector::~Connector(){
-    apr_pool_destroy(pool);
-
-    APRBase::decrement();
-}
+Connector::~Connector(){ }
 
 void Connector::connect(const std::string& host, int port){
-    apr_sockaddr_t* address;
-    CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), 
APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
-    CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
+    socket.connect(host, port);
     closed = false;
     receiver = Thread(this);
 }
@@ -65,7 +53,7 @@
 
 void Connector::close(){
     closed = true;
-    CHECK_APR_SUCCESS(apr_socket_close(socket));
+    socket.close();
     receiver.join();
 }
 
@@ -97,32 +85,26 @@
 }
 
 void Connector::writeToSocket(char* data, size_t available){
-    apr_size_t bytes(available);
-    apr_size_t written(0);
+    size_t written = 0;
     while(written < available && !closed){
-       apr_status_t status = apr_socket_send(socket, data + written, &bytes);
-        if(status == APR_TIMEUP){
-            std::cout << "Write request timed out." << std::endl;
-        }
-        if(bytes == 0){
-            std::cout << "Write request wrote 0 bytes." << std::endl;
+       ssize_t sent = socket.send(data + written, available-written);
+        if(sent > 0) {
+            lastOut = getTimeMsecs();
+            written += sent;
         }
-        lastOut = apr_time_as_msec(apr_time_now());
-       written += bytes;
-       bytes = available - written;
     }
 }
 
-void Connector::checkIdle(apr_status_t status){
+void Connector::checkIdle(ssize_t status){
     if(timeoutHandler){
-        int64_t now = apr_time_as_msec(apr_time_now());
-        if(APR_STATUS_IS_TIMEUP(status)){
+        int64_t now = getTimeMsecs();
+        if(status == Socket::SOCKET_TIMEOUT) {
             if(idleIn && (now - lastIn > idleIn)){
                 timeoutHandler->idleIn();
             }
-        }else if(APR_STATUS_IS_EOF(status)){
+        }else if(status == Socket::SOCKET_EOF){
             closed = true;
-            CHECK_APR_SUCCESS(apr_socket_close(socket));
+            socket.close();
             if(shutdownHandler) shutdownHandler->shutdown();
         }else{
             lastIn = now;
@@ -151,11 +133,7 @@
 }
 
 void Connector::setSocketTimeout(){
-    //interval is in microseconds, timeout in milliseconds
-    //want the interval to be a bit shorter than the timeout, hence multiply
-    //by 800 rather than 1000.
-    apr_interval_time_t interval(timeout * 800);
-    apr_socket_timeout_set(socket, interval);
+    socket.setTimeout(timeout);
 }
 
 void Connector::setTimeoutHandler(TimeoutHandler* handler){
@@ -165,14 +143,15 @@
 void Connector::run(){
     try{
        while(!closed){
-           apr_size_t bytes(inbuf.available());
-            if(bytes < 1){
+            ssize_t available = inbuf.available();
+            if(available < 1){
                 THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
             }
-           checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes));
+            ssize_t received = socket.recv(inbuf.start(), available);
+           checkIdle(received);
 
-           if(bytes > 0){
-               inbuf.move(bytes);
+           if(received > 0){
+               inbuf.move(received);
                inbuf.flip();//position = 0, limit = total data read
                
                AMQFrame frame;

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (from 
r472545, incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Connector.h)
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?view=diff&rev=472732&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Connector.h&r1=472545&p2=incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h&r2=472732
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/apr/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Wed Nov  8 
17:29:59 2006
@@ -18,8 +18,6 @@
 #ifndef _Connector_
 #define _Connector_
 
-#include "apr-1/apr_network_io.h"
-#include "apr-1/apr_time.h"
 
 #include "qpid/framing/InputHandler.h"
 #include "qpid/framing/OutputHandler.h"
@@ -28,11 +26,11 @@
 #include "qpid/sys/ShutdownHandler.h"
 #include "qpid/sys/TimeoutHandler.h"
 #include "qpid/sys/Thread.h"
-#include "qpid/sys/Connector.h"
 #include "qpid/sys/Monitor.h"
+#include <qpid/sys/Socket.h>
 
 namespace qpid {
-namespace sys {
+namespace client {
 
     class Connector : public qpid::framing::OutputHandler, 
                       private qpid::sys::Runnable
@@ -49,8 +47,8 @@
         u_int32_t idleIn;
         u_int32_t idleOut;
 
-        TimeoutHandler* timeoutHandler;
-        ShutdownHandler* shutdownHandler;
+        qpid::sys::TimeoutHandler* timeoutHandler;
+        qpid::sys::ShutdownHandler* shutdownHandler;
        qpid::framing::InputHandler* input;
        qpid::framing::InitiationHandler* initialiser;
        qpid::framing::OutputHandler* output;
@@ -61,10 +59,9 @@
         qpid::sys::Mutex writeLock;
        qpid::sys::Thread receiver;
 
-       apr_pool_t* pool;
-       apr_socket_t* socket;
-
-        void checkIdle(apr_status_t status);
+       qpid::sys::Socket socket;
+        
+        void checkIdle(ssize_t status);
        void writeBlock(qpid::framing::AMQDataBlock* data);
        void writeToSocket(char* data, size_t available);
         void setSocketTimeout();
@@ -78,8 +75,8 @@
        virtual void init(qpid::framing::ProtocolInitiation* header);
        virtual void close();
        virtual void setInputHandler(qpid::framing::InputHandler* handler);
-       virtual void setTimeoutHandler(TimeoutHandler* handler);
-       virtual void setShutdownHandler(ShutdownHandler* handler);
+       virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
+       virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
        virtual qpid::framing::OutputHandler* getOutputHandler();
        virtual void send(qpid::framing::AMQFrame* frame);
         virtual void setReadTimeout(u_int16_t timeout);

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h?view=auto&rev=472732
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h Wed Nov  8 17:29:59 2006
@@ -0,0 +1,27 @@
+#ifndef _sys_Socket_h
+#define _sys_Socket_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <qpid/sys/platform.h>
+#include QPID_PLATFORM_H(Socket.h)
+
+
+
+#endif  /*!_sys_Socket_h*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h?view=diff&rev=472732&r1=472731&r2=472732
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h Wed Nov  8 17:29:59 2006
@@ -1,5 +1,5 @@
-#ifndef _concurrent_Time_h
-#define _concurrent_Time_h
+#ifndef _sys_Time_h
+#define _sys_Time_h
 
 /*
  *
@@ -35,4 +35,4 @@
 
 }}
 
-#endif  /*!_concurrent_Time_h*/
+#endif  /*!_sys_Time_h*/

Modified: incubator/qpid/trunk/qpid/cpp/test/client/topic_publisher.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/client/topic_publisher.cpp?view=diff&rev=472732&r1=472731&r2=472732
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/client/topic_publisher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/test/client/topic_publisher.cpp Wed Nov  8 
17:29:59 2006
@@ -114,7 +114,7 @@
                 if(!max || time > max) max = time;
                 if(!min || time < min) min = time;
                 sum += time;
-                std::cout << "Completed " << (i+1) << " of " << batchSize << " 
in " << nsecsToMsecs(time) << "ms" << std::endl;
+                std::cout << "Completed " << (i+1) << " of " << batchSize << " 
in " << time << "ms" << std::endl;
             }
             publisher.terminate();
             int64_t avg = sum / batchSize;
@@ -133,13 +133,12 @@
 Publisher::Publisher(Channel* _channel, const std::string& _controlTopic, bool 
tx) : 
     channel(_channel), controlTopic(_controlTopic), transactional(tx){}
 
-void Publisher::received(Message& msg){
+void Publisher::received(Message& ){
     //count responses and when all are received end the current batch
     Monitor::ScopedLock l(monitor);
     if(--count == 0){
         monitor.notify();
     }
-    std::cout << "Received report: " << msg.getData() << " (" << count << " 
remaining)." << std::endl;
 }
 
 void Publisher::waitForCompletion(int msgs){


Reply via email to