Added: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/posix/EventChannelTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/posix/EventChannelTest.cpp?view=auto&rev=480582
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/posix/EventChannelTest.cpp 
(added)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/posix/EventChannelTest.cpp Wed 
Nov 29 06:36:08 2006
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/posix/EventChannel.h>
+#include <qpid/posix/check.h>
+#include <qpid/sys/Runnable.h>
+#include <qpid/sys/Socket.h>
+#include <qpid/sys/Thread.h>
+#include <qpid_test_plugin.h>
+
+#include <sys/socket.h>
+#include <signal.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <iostream>
+
+using namespace qpid::sys;
+
+
+const char hello[] = "hello";
+const size_t size = sizeof(hello);
+
+struct RunMe : public Runnable 
+{
+    bool ran;
+    RunMe() : ran(false) {}
+    void run() { ran = true; }
+};
+
+class EventChannelTest : public CppUnit::TestCase  
+{
+    CPPUNIT_TEST_SUITE(EventChannelTest);
+    CPPUNIT_TEST(testEvent);
+    CPPUNIT_TEST(testRead);
+    CPPUNIT_TEST(testFailedRead);
+    CPPUNIT_TEST(testWrite);
+    CPPUNIT_TEST(testFailedWrite);
+    CPPUNIT_TEST(testReadWrite);
+    CPPUNIT_TEST(testAccept);
+    CPPUNIT_TEST_SUITE_END();
+
+  private:
+    EventChannel::shared_ptr ec;
+    int pipe[2];
+    char readBuf[size];
+
+  public:
+
+    void setUp()
+    {
+        memset(readBuf, size, 0);
+        ec = EventChannel::create();
+        if (::pipe(pipe) != 0) throw QPID_POSIX_ERROR(errno);
+        // Ignore SIGPIPE, otherwise we will crash writing to broken pipe.
+        signal(SIGPIPE, SIG_IGN);
+    }
+
+    // Verify that calling getEvent returns event.
+    template <class T> bool isNextEvent(T& event)
+    {
+        return &event == dynamic_cast<T*>(ec->getEvent());
+    }
+
+    template <class T> bool isNextEventOk(T& event)
+    {
+        Event* next = ec->getEvent();
+        if (next) next->throwIfError();
+        return &event == next;
+    }
+        
+    void testEvent()
+    {
+        RunMe runMe;
+        CPPUNIT_ASSERT(!runMe.ran);
+        // Instances of Event just pass thru the channel immediately.
+        Event e(runMe.functor());
+        ec->postEvent(e);
+        CPPUNIT_ASSERT(isNextEventOk(e));
+        e.dispatch();
+        CPPUNIT_ASSERT(runMe.ran);
+    }
+
+    void testRead() {
+        ReadEvent re(pipe[0], readBuf, size);
+        ec->postEvent(re);
+        CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::write(pipe[1], hello, size));
+        CPPUNIT_ASSERT(isNextEventOk(re));
+        CPPUNIT_ASSERT_EQUAL(size, re.getSize());
+        CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
+    }
+
+    void testFailedRead() 
+    {
+        ReadEvent re(pipe[0], readBuf, size);
+        ec->postEvent(re);
+
+        // EOF before all data read.
+        ::close(pipe[1]);
+        CPPUNIT_ASSERT(isNextEvent(re));
+        CPPUNIT_ASSERT(re.hasError());
+        try {
+            re.throwIfError();
+            CPPUNIT_FAIL("Expected QpidError.");
+        }
+        catch (const qpid::QpidError&) { }
+
+        //  Bad file descriptor. Note in this case we fail
+        //  in postEvent and throw immediately.
+        try {
+            ReadEvent bad;
+            ec->postEvent(bad);
+            CPPUNIT_FAIL("Expected QpidError.");
+        }
+        catch (const qpid::QpidError&) { }
+    }
+
+    void testWrite() {
+        WriteEvent wr(pipe[1], hello, size);
+        ec->postEvent(wr);
+        CPPUNIT_ASSERT(isNextEventOk(wr));
+        CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::read(pipe[0], readBuf, size));;
+        CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
+    }
+
+    void testFailedWrite() {
+        WriteEvent wr(pipe[1], hello, size);
+        ::close(pipe[0]);
+        ec->postEvent(wr);
+        CPPUNIT_ASSERT(isNextEvent(wr));
+        CPPUNIT_ASSERT(wr.hasError());
+    }
+
+    void testReadWrite()
+    {
+        ReadEvent re(pipe[0], readBuf, size);
+        WriteEvent wr(pipe[1], hello, size);
+        ec->postEvent(re);
+        ec->postEvent(wr);
+        ec->getEvent();
+        ec->getEvent();
+        CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
+    }
+
+    void testAccept() {
+        Socket s = Socket::createTcp();
+        int port = s.listen(0, 10);
+        CPPUNIT_ASSERT(port != 0);
+
+        AcceptEvent ae(s.fd());
+        ec->postEvent(ae);
+        Socket client = Socket::createTcp();
+        client.connect("localhost", port);
+        CPPUNIT_ASSERT(isNextEvent(ae));
+        ae.dispatch();
+
+        // Verify client writes are read by the accepted descriptor.
+        char readBuf[size];
+        ReadEvent re(ae.getAcceptedDesscriptor(), readBuf, size);
+        ec->postEvent(re);
+        CPPUNIT_ASSERT_EQUAL(ssize_t(size), client.send(hello, sizeof(hello)));
+        CPPUNIT_ASSERT(isNextEvent(re));
+        re.dispatch();
+        CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelTest);
+

Propchange: 
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/posix/EventChannelTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/posix/EventChannelTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp?view=auto&rev=480582
==============================================================================
--- 
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp 
(added)
+++ 
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp 
Wed Nov 29 06:36:08 2006
@@ -0,0 +1,247 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <boost/bind.hpp>
+
+#include <qpid/sys/Socket.h>
+#include <qpid/posix/EventChannelThreads.h>
+#include <qpid_test_plugin.h>
+
+
+using namespace std;
+
+using namespace qpid::sys;
+
+const int nConnections = 5;
+const int nMessages = 10; // Messages read/written per connection.
+
+
+// Accepts + reads + writes.
+const int totalEvents = nConnections+2*nConnections*nMessages;
+
+/**
+ * Messages are numbered 0..nMessages.
+ * We count the total number of events, and the
+ * number of reads and writes for each message number.
+ */
+class TestResults : public Monitor {
+  public:
+    TestResults() : isShutdown(false), nEventsRemaining(totalEvents) {}
+
+    void countEvent() {
+        if (--nEventsRemaining == 0)
+            shutdown();
+    }
+
+    void countRead(int messageNo) {
+        ++reads[messageNo];
+        countEvent();
+    }
+
+    void countWrite(int messageNo) {
+        ++writes[messageNo];
+        countEvent();
+    }
+
+    void shutdown(const std::string& exceptionMsg = std::string()) {
+        ScopedLock lock(*this);
+        exception = exceptionMsg;
+        isShutdown = true;
+        notifyAll();
+    }
+    
+    void wait() {
+        ScopedLock lock(*this);
+        Time deadline = now() + 10*TIME_SEC; 
+        while (!isShutdown) {
+            CPPUNIT_ASSERT(Monitor::wait(deadline));
+        }
+    }
+
+    bool isShutdown;
+    std::string exception;
+    AtomicCount reads[nMessages];
+    AtomicCount writes[nMessages];
+    AtomicCount nEventsRemaining;
+};
+
+TestResults results;
+
+EventChannelThreads::shared_ptr threads;
+
+// Functor to wrap callbacks in try/catch.
+class SafeCallback {
+  public:
+    SafeCallback(Runnable& r) : callback(r.functor()) {}
+    SafeCallback(Event::Callback cb) : callback(cb) {}
+    
+    void operator()() {
+        std::string exception;
+        try {
+            callback();
+            return;
+        }
+        catch (const std::exception& e) {
+            exception = e.what();
+        }
+        catch (...) {
+            exception = "Unknown exception.";
+        }
+        results.shutdown(exception);
+    }
+
+  private:
+    Event::Callback callback;
+};
+
+/** Repost an event N times. */
+class Repost {
+  public:
+    Repost(int n) : count (n) {}
+    virtual ~Repost() {}
+    
+    void repost(Event* event) {
+        if (--count==0) {
+            delete event;
+        } else {
+            threads->postEvent(event);
+        }
+    }
+  private:
+    int count;
+};
+    
+            
+
+/** Repeating read event. */
+class TestReadEvent : public ReadEvent, public Runnable, private Repost {
+  public:
+    explicit TestReadEvent(int fd=-1) :
+        ReadEvent(fd, &value, sizeof(value), SafeCallback(*this)),
+        Repost(nMessages)
+    {}
+    
+    void run() {
+        CPPUNIT_ASSERT_EQUAL(sizeof(value), getSize());
+        CPPUNIT_ASSERT(0 <= value);
+        CPPUNIT_ASSERT(value < nMessages);
+        results.countRead(value);
+        repost(this);
+    }
+    
+  private:
+    int value;
+    ReadEvent original;
+};
+
+
+/** Fire and forget write event */
+class TestWriteEvent : public WriteEvent, public Runnable, private Repost {
+  public:
+    TestWriteEvent(int fd=-1) :
+        WriteEvent(fd, &value, sizeof(value), SafeCallback(*this)),
+        Repost(nMessages),
+        value(0)
+    {}
+    
+    void run() {
+        CPPUNIT_ASSERT_EQUAL(sizeof(int), getSize());
+        results.countWrite(value++);
+        repost(this);
+    }
+
+  private:
+    int value;
+};
+
+/** Fire-and-forget Accept event, posts reads on the accepted connection. */
+class TestAcceptEvent : public AcceptEvent, public Runnable, private Repost {
+  public:
+    TestAcceptEvent(int fd=-1) :
+        AcceptEvent(fd, SafeCallback(*this)),
+        Repost(nConnections)
+    {}
+    
+    void run() {
+        threads->postEvent(new TestReadEvent(getAcceptedDesscriptor()));
+        results.countEvent();
+        repost(this);
+    }
+};
+
+class EventChannelThreadsTest : public CppUnit::TestCase
+{
+    CPPUNIT_TEST_SUITE(EventChannelThreadsTest);
+    CPPUNIT_TEST(testThreads);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+
+    void setUp() {
+        threads = EventChannelThreads::create(EventChannel::create());
+    }
+
+    void tearDown() {
+        threads.reset();
+    }
+
+    void testThreads()
+    {
+        Socket listener = Socket::createTcp();
+        int port = listener.listen();
+
+        // Post looping accept events, will repost nConnections times.
+        // The accept event will automatically post read events.
+        threads->postEvent(new TestAcceptEvent(listener.fd()));
+
+        // Make connections.
+        Socket connections[nConnections];
+        for (int i = 0; i < nConnections; ++i) {
+            connections[i] = Socket::createTcp();
+            connections[i].connect("localhost", port);
+        }
+
+        // Post looping write events.
+        for (int i = 0; i < nConnections; ++i) {
+            threads->postEvent(new TestWriteEvent(connections[i].fd()));
+        }
+
+        // Wait for all events to be dispatched.
+        results.wait();
+
+        if (!results.exception.empty()) CPPUNIT_FAIL(results.exception);
+        CPPUNIT_ASSERT_EQUAL(0, int(results.nEventsRemaining));
+
+        // Expect a read and write for each messageNo from each connection.
+        for (int messageNo = 0; messageNo < nMessages; ++messageNo) {
+            CPPUNIT_ASSERT_EQUAL(nConnections, int(results.reads[messageNo]));
+            CPPUNIT_ASSERT_EQUAL(nConnections, int(results.writes[messageNo]));
+        }
+
+        threads->shutdown();
+        threads->join();
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelThreadsTest);
+

Propchange: 
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to