Author: aconway
Date: Tue Mar 18 14:31:08 2008
New Revision: 638590

URL: http://svn.apache.org/viewvc?rev=638590&view=rev
Log:

Make AsyncIOAcceptor multi-protocol:
 - ConnectionCodec interface replaces ConnectionInputHandle, moves 
encoding/decoding out of AsyncIOAcceptor.
 - ConnectionCodec::Factory replaces ConnectionInputHandlerFactory
 - Acceptor creates version-specific ConnectionCodec based on protocol header.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp   (with 
props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionCodec.cpp   
(with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionCodec.h   
(with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
    
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
    
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_framing.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
    incubator/qpid/trunk/qpid/cpp/src/tests/MockConnectionInputHandler.h

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Mar 18 14:31:08 2008
@@ -131,7 +131,6 @@
   qpid/framing/FieldValue.cpp \
   qpid/framing/FramingContent.cpp \
   qpid/framing/FrameSet.cpp \
-  qpid/framing/InitiationHandler.cpp \
   qpid/framing/ProtocolInitiation.cpp \
   qpid/framing/ProtocolVersion.cpp \
   qpid/framing/SessionState.cpp \
@@ -168,6 +167,8 @@
 libqpidbroker_la_LIBADD = libqpidcommon.la -lboost_iostreams
 libqpidbroker_la_SOURCES = \
   $(mgen_broker_cpp) \
+  qpid/amqp_0_10/Connection.h \
+  qpid/amqp_0_10/Connection.cpp \
   qpid/broker/Broker.cpp \
   qpid/broker/BrokerAdapter.cpp \
   qpid/broker/SessionAdapter.cpp \
@@ -177,6 +178,7 @@
   qpid/broker/PersistableMessage.cpp \
   qpid/broker/Bridge.cpp \
   qpid/broker/PreviewConnection.cpp \
+  qpid/broker/PreviewConnectionCodec.cpp \
   qpid/broker/PreviewConnectionHandler.cpp \
   qpid/broker/PreviewSessionHandler.cpp \
   qpid/broker/PreviewSessionManager.cpp \
@@ -286,6 +288,7 @@
   qpid/broker/BrokerSingleton.h \
   qpid/broker/Bridge.h \
   qpid/broker/PreviewConnection.h \
+  qpid/broker/PreviewConnectionCodec.h \
   qpid/broker/PreviewConnectionHandler.h \
   qpid/broker/PreviewSessionHandler.h \
   qpid/broker/PreviewSessionManager.h \
@@ -409,7 +412,6 @@
   qpid/framing/FramingContent.h \
   qpid/framing/Handler.h \
   qpid/framing/HeaderProperties.h \
-  qpid/framing/InitiationHandler.h \
   qpid/framing/Invoker.h \
   qpid/framing/InputHandler.h \
   qpid/framing/MethodContent.h \
@@ -459,6 +461,7 @@
   qpid/sys/Monitor.h \
   qpid/sys/Mutex.h \
   qpid/sys/OutputControl.h \
+  qpid/sys/ConnectionCodec.h \
   qpid/sys/OutputTask.h \
   qpid/sys/Poller.h \
   qpid/sys/Runnable.h \

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=638590&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Tue Mar 18 
14:31:08 2008
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Connection.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace amqp_0_10 {
+
+using sys::Mutex;
+
+Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const 
std::string& id)
+    : frameQueueClosed(false), output(o), connection(this, broker, id),
+      identifier(id), initialized(false) {}
+
+size_t  Connection::decode(const char* buffer, size_t size) {
+    framing::Buffer in(const_cast<char*>(buffer), size);
+    framing::AMQFrame frame;
+    while(frame.decode(in)) {
+        QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
+        connection.received(frame);
+    }
+    return in.getPosition();
+}
+
+bool Connection::canEncode() {
+    if (!frameQueueClosed) connection.doOutput();
+    Mutex::ScopedLock l(frameQueueLock);
+    return !initialized || !frameQueue.empty();
+}
+
+bool Connection::isClosed() const {
+    Mutex::ScopedLock l(frameQueueLock);
+    return frameQueueClosed;
+}
+
+size_t  Connection::encode(const char* buffer, size_t size) {
+    Mutex::ScopedLock l(frameQueueLock);
+    framing::Buffer out(const_cast<char*>(buffer), size);
+    if (!initialized) {
+        framing::ProtocolInitiation pi(getVersion());
+        pi.encode(out);
+        initialized = true;
+    }
+    while (!frameQueue.empty() && (frameQueue.front().size() <= 
out.available())) {
+            frameQueue.front().encode(out);
+            QPID_LOG(trace, "SENT [" << identifier << "]: " << 
frameQueue.front());
+            frameQueue.pop();
+    }
+    if (!frameQueue.empty() && frameQueue.front().size() > size)
+        throw framing::ContentTooLargeException(QPID_MSG("Could not write 
frame, too large for buffer."));
+    return out.getPosition();
+}
+
+void  Connection::activateOutput() { output.activateOutput(); }
+
+void  Connection::close() {
+    // Close the output queue.
+    Mutex::ScopedLock l(frameQueueLock);
+    frameQueueClosed = true;
+}
+
+void  Connection::closed() {
+    connection.closed();
+}
+
+void Connection::send(framing::AMQFrame& f) {
+    {
+        Mutex::ScopedLock l(frameQueueLock);
+       if (!frameQueueClosed)
+            frameQueue.push(f);
+    }
+    activateOutput();
+}
+
+framing::ProtocolVersion Connection::getVersion() const {
+    return framing::ProtocolVersion(0,10);
+}
+
+}} // namespace qpid::amqp_0_10

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=638590&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Tue Mar 18 
14:31:08 2008
@@ -0,0 +1,62 @@
+#ifndef QPID_BROKER_CONNECTION_H
+#define QPID_BROKER_CONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/Mutex.h"
+#include "Connection.h"
+#include "qpid/broker/Connection.h"
+#include <queue>
+
+namespace qpid {
+namespace broker { class Broker; }
+namespace amqp_0_10 {
+
+// FIXME aconway 2008-03-18: Update to 0-10.
+class Connection  : public sys::ConnectionCodec,
+                    public sys::ConnectionOutputHandler
+{
+    std::queue<framing::AMQFrame> frameQueue;
+    bool frameQueueClosed;
+    mutable sys::Mutex frameQueueLock;
+    sys::OutputControl& output;
+    broker::Connection connection; // FIXME aconway 2008-03-18: 
+    std::string identifier;
+    bool initialized;
+    
+  public:
+    Connection(sys::OutputControl&, broker::Broker&, const std::string& id);
+    size_t decode(const char* buffer, size_t size);
+    size_t encode(const char* buffer, size_t size);
+    bool isClosed() const;
+    bool canEncode();
+    void activateOutput();
+    void closed();              // connection closed by peer.
+    void close();               // closing from this end.
+    void send(framing::AMQFrame&);
+    framing::ProtocolVersion getVersion() const;
+};
+
+}} // namespace qpid::amqp_0_10
+
+#endif  /*!QPID_BROKER_CONNECTION_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Mar 18 
14:31:08 2008
@@ -286,19 +286,19 @@
     return status;
 }
 
-sys::ConnectionInputHandler* Broker::connect(
+void Broker::connect(
     const std::string& host, uint16_t port,
-    sys::ConnectionInputHandlerFactory* f)
+    sys::ConnectionCodec::Factory* f)
 {
-    return getAcceptor().connect(host, port, f ? f : &factory);
+    getAcceptor().connect(host, port, f ? f : &factory);
 }
 
-sys::ConnectionInputHandler* Broker::connect(
-    const Url& url, sys::ConnectionInputHandlerFactory* f)
+void Broker::connect(
+    const Url& url, sys::ConnectionCodec::Factory* f)
 {
     url.throwIfEmpty();
     TcpAddress addr=boost::get<TcpAddress>(url[0]);
-    return connect(addr.host, addr.port, f);
+    connect(addr.host, addr.port, f);
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Mar 18 14:31:08 
2008
@@ -119,12 +119,10 @@
         ManagementMethod (uint32_t methodId, management::Args& args);
     
     /** Create a connection to another broker. */
-    sys::ConnectionInputHandler*
-    connect(const std::string& host, uint16_t port,
-            sys::ConnectionInputHandlerFactory* =0);
+    void connect(const std::string& host, uint16_t port,
+                 sys::ConnectionCodec::Factory* =0);
     /** Create a connection to another broker. */
-    sys::ConnectionInputHandler*
-    connect(const Url& url, sys::ConnectionInputHandlerFactory* =0);
+    void connect(const Url& url, sys::ConnectionCodec::Factory* =0);
 
   private:
     sys::Acceptor& getAcceptor() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Mar 18 
14:31:08 2008
@@ -90,7 +90,9 @@
     adapter(*this),
     mgmtClosing(0),
     mgmtId(mgmtId_)
-{}
+{
+    initMgmt();
+}
 
 void Connection::initMgmt(bool asLink)
 {
@@ -132,12 +134,6 @@
     adapter.close(code, text, classId, methodId);
     channels.clear();
     getOutput().close();
-}
-
-void Connection::initiated(const framing::ProtocolInitiation& header) {
-    version = ProtocolVersion(header.getMajor(), header.getMinor());
-    adapter.init(header);
-    initMgmt();
 }
 
 void Connection::idleOut(){}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Mar 18 
14:31:08 2008
@@ -65,12 +65,10 @@
 
     // ConnectionInputHandler methods
     void received(framing::AMQFrame& frame);
-    void initiated(const framing::ProtocolInitiation& header);
     void idleOut();
     void idleIn();
     void closed();
     bool doOutput();
-    framing::ProtocolInitiation getInitiation() { return 
framing::ProtocolInitiation(version); }
 
     void closeChannel(framing::ChannelId channel);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp Tue Mar 
18 14:31:08 2008
@@ -19,27 +19,32 @@
  *
  */
 #include "ConnectionFactory.h"
-#include "Connection.h"
-#include "MultiVersionConnectionInputHandler.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/amqp_0_10/Connection.h"
+#include "PreviewConnectionCodec.h"
 
 namespace qpid {
 namespace broker {
 
+using framing::ProtocolVersion;
 
-ConnectionFactory::ConnectionFactory(Broker& b) : broker(b)
-{}
+ConnectionFactory::ConnectionFactory(Broker& b) : broker(b) {}
 
+ConnectionFactory::~ConnectionFactory() {}
 
-ConnectionFactory::~ConnectionFactory()
-{
-
+sys::ConnectionCodec*
+ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const 
std::string& id) {
+    if (v == ProtocolVersion(99, 0)) 
+        return new PreviewConnectionCodec(out, broker, id);
+    if (v == ProtocolVersion(0, 10))
+        return new amqp_0_10::Connection(out, broker, id);
+    return 0;
 }
 
-qpid::sys::ConnectionInputHandler*
-ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out,
-                          const std::string& id)
-{
-    return new MultiVersionConnectionInputHandler(out, broker, id);
+sys::ConnectionCodec*
+ConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
+        // FIXME aconway 2008-03-18: 
+        return new PreviewConnectionCodec(out, broker, id);
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h Tue Mar 
18 14:31:08 2008
@@ -21,21 +21,23 @@
 #ifndef _ConnectionFactory_
 #define _ConnectionFactory_
 
-#include "qpid/sys/ConnectionInputHandlerFactory.h"
+#include "qpid/sys/ConnectionCodec.h"
 
 namespace qpid {
 namespace broker {
 class Broker;
 
-class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory
-{
+class ConnectionFactory : public sys::ConnectionCodec::Factory {
   public:
     ConnectionFactory(Broker& b);
             
-    virtual qpid::sys::ConnectionInputHandler*
-    create(qpid::sys::ConnectionOutputHandler* out, const std::string& id);
-            
     virtual ~ConnectionFactory();
+
+    sys::ConnectionCodec*
+    create(framing::ProtocolVersion, sys::OutputControl&, const std::string& 
id);
+
+    sys::ConnectionCodec*
+    create(sys::OutputControl&, const std::string& id);
 
   private:
     Broker& broker;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Tue Mar 
18 14:31:08 2008
@@ -38,17 +38,6 @@
 const std::string en_US = "en_US";
 }
 
-void ConnectionHandler::init(const framing::ProtocolInitiation& header) {
-    //need to send out a protocol header back to the client
-    handler->connection.getOutput().initiated(header);
-
-    FieldTable properties;
-    string mechanisms(PLAIN);
-    string locales(en_US);
-    handler->serverMode = true;    
-    handler->client.start(properties, mechanisms, locales);
-}
-
 void ConnectionHandler::close(ReplyCode code, const string& text, ClassId 
classId, MethodId methodId)
 {
     handler->client.close(code, text, classId, methodId);
@@ -75,7 +64,15 @@
     }
 }
 
-ConnectionHandler::ConnectionHandler(Connection& connection)  : handler(new 
Handler(connection)) {}
+ConnectionHandler::ConnectionHandler(Connection& connection)  : handler(new 
Handler(connection)) {
+    FieldTable properties;
+    string mechanisms(PLAIN);
+    string locales(en_US);
+    handler->serverMode = true;
+    handler->client.start(properties, mechanisms, locales);
+}
+
+
 
 ConnectionHandler::Handler:: Handler(Connection& c) : client(c.getOutput()), 
server(c.getOutput()), 
                                                       connection(c), 
serverMode(false) {}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h Tue Mar 
18 14:31:08 2008
@@ -38,7 +38,6 @@
 
 class Connection;
 
-// TODO aconway 2007-09-18: Rename to ConnectionHandler
 class ConnectionHandler : public framing::FrameHandler
 {
     struct Handler : public 
framing::AMQP_ServerOperations::Connection010Handler, 
@@ -82,7 +81,6 @@
     std::auto_ptr<Handler> handler;
   public:
     ConnectionHandler(Connection& connection);
-    void init(const framing::ProtocolInitiation& header);
     void close(framing::ReplyCode code, const std::string& text, 
framing::ClassId classId, framing::MethodId methodId);
     void handle(framing::AMQFrame& frame);
 };

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
 (original)
+++ 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
 Tue Mar 18 14:31:08 2008
@@ -31,19 +31,6 @@
     Broker& _broker, 
     const std::string& _id) : linkVersion(99,0), out(_out), broker(_broker), 
id(_id) {}
 
-    
-void MultiVersionConnectionInputHandler::initiated(const 
qpid::framing::ProtocolInitiation& i)
-{
-    if (i.getMajor() == 99 && i.getMinor() == 0) {
-        handler = std::auto_ptr<ConnectionInputHandler>(new 
PreviewConnection(out, broker, id));
-    } else if (i.getMajor() == 0 && i.getMinor() == 10) {
-        handler = std::auto_ptr<ConnectionInputHandler>(new Connection(out, 
broker, id));
-    } else {
-        throw qpid::framing::InternalErrorException("Unsupported version: " + 
i.getVersion().toString());        
-    }
-    handler->initiated(i);
-}
-
 void MultiVersionConnectionInputHandler::received(qpid::framing::AMQFrame& f)
 {
     check();
@@ -67,11 +54,6 @@
     return handler.get() &&  handler->doOutput();
 }
     
-qpid::framing::ProtocolInitiation 
MultiVersionConnectionInputHandler::getInitiation()
-{
-    return qpid::framing::ProtocolInitiation(linkVersion);
-}
-
 void MultiVersionConnectionInputHandler::closed()
 {
     if (handler.get()) handler->closed();

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
 (original)
+++ 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
 Tue Mar 18 14:31:08 2008
@@ -44,12 +44,10 @@
     MultiVersionConnectionInputHandler(qpid::sys::ConnectionOutputHandler* 
out, Broker& broker, const std::string& id);
     virtual ~MultiVersionConnectionInputHandler() {}
 
-    void initiated(const qpid::framing::ProtocolInitiation&);
     void received(qpid::framing::AMQFrame&);
     void idleOut();
     void idleIn();
     bool doOutput();    
-    qpid::framing::ProtocolInitiation getInitiation();
     void closed();
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.cpp Tue Mar 
18 14:31:08 2008
@@ -90,7 +90,9 @@
     adapter(*this),
     mgmtClosing(0),
     mgmtId(mgmtId_)
-{}
+{
+    initMgmt();
+}
 
 void PreviewConnection::initMgmt(bool asLink)
 {
@@ -132,12 +134,6 @@
     adapter.close(code, text, classId, methodId);
     channels.clear();
     getOutput().close();
-}
-
-void PreviewConnection::initiated(const framing::ProtocolInitiation& header) {
-    version = ProtocolVersion(header.getMajor(), header.getMinor());
-    adapter.init(header);
-    initMgmt();
 }
 
 void PreviewConnection::idleOut(){}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnection.h Tue Mar 
18 14:31:08 2008
@@ -50,8 +50,7 @@
 namespace qpid {
 namespace broker {
 
-class PreviewConnection : public sys::ConnectionInputHandler, 
-                   public ConnectionState
+class PreviewConnection : public sys::ConnectionInputHandler, public 
ConnectionState
 {
   public:
     PreviewConnection(sys::ConnectionOutputHandler* out, Broker& broker, const 
std::string& mgmtId);
@@ -65,12 +64,10 @@
 
     // ConnectionInputHandler methods
     void received(framing::AMQFrame& frame);
-    void initiated(const framing::ProtocolInitiation& header);
     void idleOut();
     void idleIn();
     void closed();
     bool doOutput();
-    framing::ProtocolInitiation getInitiation() { return 
framing::ProtocolInitiation(version); }
 
     void closeChannel(framing::ChannelId channel);
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionCodec.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionCodec.cpp?rev=638590&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionCodec.cpp 
(added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionCodec.cpp 
Tue Mar 18 14:31:08 2008
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PreviewConnectionCodec.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace broker {
+
+using sys::Mutex;
+
+PreviewConnectionCodec::PreviewConnectionCodec(sys::OutputControl& o, Broker& 
broker, const std::string& id)
+    : frameQueueClosed(false), output(o), connection(this, broker, id), 
identifier(id) {}
+
+size_t  PreviewConnectionCodec::decode(const char* buffer, size_t size) {
+    framing::Buffer in(const_cast<char*>(buffer), size);
+    framing::AMQFrame frame;
+    while(frame.decode(in)) {
+        QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
+        connection.received(frame);
+    }
+    return in.getPosition();
+}
+
+bool PreviewConnectionCodec::canEncode() {
+    if (!frameQueueClosed) connection.doOutput();
+    return !frameQueue.empty();
+}
+
+bool PreviewConnectionCodec::isClosed() const {
+    Mutex::ScopedLock l(frameQueueLock);
+    return frameQueueClosed;
+}
+
+size_t  PreviewConnectionCodec::encode(const char* buffer, size_t size) {
+    Mutex::ScopedLock l(frameQueueLock);
+    framing::Buffer out(const_cast<char*>(buffer), size);
+    while (!frameQueue.empty() && (frameQueue.front().size() <= 
out.available())) {
+            frameQueue.front().encode(out);
+            QPID_LOG(trace, "SENT [" << identifier << "]: " << 
frameQueue.front());
+            frameQueue.pop();
+    }
+    if (!frameQueue.empty() && frameQueue.front().size() > size)
+        throw framing::ContentTooLargeException(QPID_MSG("Could not write 
frame, too large for buffer."));
+    return out.getPosition();
+}
+
+void  PreviewConnectionCodec::activateOutput() { output.activateOutput(); }
+
+void  PreviewConnectionCodec::close() {
+    // Close the output queue.
+    Mutex::ScopedLock l(frameQueueLock);
+    frameQueueClosed = true;
+}
+
+void  PreviewConnectionCodec::closed() {
+    connection.closed();
+}
+
+void PreviewConnectionCodec::send(framing::AMQFrame& f) {
+    {
+        Mutex::ScopedLock l(frameQueueLock);
+       if (!frameQueueClosed)
+            frameQueue.push(f);
+    }
+    activateOutput();
+}
+
+framing::ProtocolVersion PreviewConnectionCodec::getVersion() const {
+    return framing::ProtocolVersion(99,0);
+}
+
+}} // namespace qpid::broker

Propchange: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionCodec.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionCodec.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionCodec.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionCodec.h?rev=638590&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionCodec.h 
(added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionCodec.h Tue 
Mar 18 14:31:08 2008
@@ -0,0 +1,55 @@
+#ifndef QPID_BROKER_PREVIEWCONNECTIONCODEC_H
+#define QPID_BROKER_PREVIEWCONNECTIONCODEC_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/Mutex.h"
+#include "PreviewConnection.h"
+
+namespace qpid {
+namespace broker {
+
+class PreviewConnectionCodec  : public sys::ConnectionCodec, public 
sys::ConnectionOutputHandler {
+    std::queue<framing::AMQFrame> frameQueue;
+    bool frameQueueClosed;
+    mutable sys::Mutex frameQueueLock;
+    sys::OutputControl& output;
+    PreviewConnection connection;
+    std::string identifier;
+    
+  public:
+    PreviewConnectionCodec(sys::OutputControl&, Broker&, const std::string& 
id);
+    size_t decode(const char* buffer, size_t size);
+    size_t encode(const char* buffer, size_t size);
+    bool isClosed() const;
+    bool canEncode();
+    void activateOutput();
+    void closed();              // connection closed by peer.
+    void close();               // closing from this end.
+    void send(framing::AMQFrame&);
+    framing::ProtocolVersion getVersion() const;
+};
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_PREVIEWCONNECTIONCODEC_H*/

Propchange: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionCodec.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionCodec.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.cpp 
Tue Mar 18 14:31:08 2008
@@ -37,14 +37,6 @@
 const std::string en_US = "en_US";
 }
 
-void PreviewConnectionHandler::init(const framing::ProtocolInitiation& header) 
{
-    FieldTable properties;
-    string mechanisms(PLAIN);
-    string locales(en_US);
-    handler->serverMode = true;
-    handler->client.start(header.getMajor(), header.getMinor(), properties, 
mechanisms, locales);
-}
-
 void PreviewConnectionHandler::close(ReplyCode code, const string& text, 
ClassId classId, MethodId methodId)
 {
     handler->client.close(code, text, classId, methodId);
@@ -68,7 +60,13 @@
     }
 }
 
-PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& 
connection)  : handler(new Handler(connection)) {}
+PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& 
connection)  : handler(new Handler(connection)) {
+    FieldTable properties;
+    string mechanisms(PLAIN);
+    string locales(en_US);
+    handler->serverMode = true;
+    handler->client.start(0, 10, properties, mechanisms, locales);
+}
 
 PreviewConnectionHandler::Handler:: Handler(PreviewConnection& c) : 
client(c.getOutput()), server(c.getOutput()), 
                                                       connection(c), 
serverMode(false) {}

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewConnectionHandler.h 
Tue Mar 18 14:31:08 2008
@@ -81,7 +81,6 @@
     std::auto_ptr<Handler> handler;
   public:
     PreviewConnectionHandler(PreviewConnection& connection);
-    void init(const framing::ProtocolInitiation& header);
     void close(framing::ReplyCode code, const std::string& text, 
framing::ClassId classId, framing::MethodId methodId);
     void handle(framing::AMQFrame& frame);
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.cpp Tue 
Mar 18 14:31:08 2008
@@ -58,6 +58,9 @@
     }
 }
 
-//TODO: this should prbably be generated from the spec at some point to keep 
the version numbers up to date
+
+std::ostream& operator<<(std::ostream& o, const framing::ProtocolInitiation& 
pi) {
+    return o << int(pi.getMajor()) << "-" << int(pi.getMinor());
+}
 
 }} // namespace qpid::framing

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolInitiation.h Tue Mar 
18 14:31:08 2008
@@ -45,7 +45,11 @@
     inline uint8_t getMajor() const { return version.getMajor(); }
     inline uint8_t getMinor() const { return version.getMinor(); }
     inline ProtocolVersion getVersion() const { return version; }
+    bool operator==(ProtocolVersion v) const { return v == getVersion(); }
 };
+
+std::ostream& operator<<(std::ostream& o, const framing::ProtocolInitiation& 
pi);
+
 
 }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_framing.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_framing.h?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_framing.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_framing.h Tue Mar 18 
14:31:08 2008
@@ -28,6 +28,5 @@
 #include "AMQHeartbeatBody.h"
 #include "InputHandler.h"
 #include "OutputHandler.h"
-#include "InitiationHandler.h"
 #include "ProtocolInitiation.h"
 #include "ProtocolVersion.h"

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h Tue Mar 18 14:31:08 
2008
@@ -24,13 +24,11 @@
 
 #include <stdint.h>
 #include "qpid/SharedObject.h"
+#include "ConnectionCodec.h"
 
 namespace qpid {
 namespace sys {
 
-class ConnectionInputHandlerFactory;
-class ConnectionInputHandler;
-
 class Acceptor : public qpid::SharedObject<Acceptor>
 {
   public:
@@ -38,10 +36,9 @@
     virtual ~Acceptor() = 0;
     virtual uint16_t getPort() const = 0;
     virtual std::string getHost() const = 0;
-    virtual void run(ConnectionInputHandlerFactory* factory) = 0;
-    virtual ConnectionInputHandler* connect(
-        const std::string& host, int16_t port,
-        ConnectionInputHandlerFactory* factory) = 0;
+    virtual void run(ConnectionCodec::Factory*) = 0;
+    virtual void connect(
+        const std::string& host, int16_t port, ConnectionCodec::Factory* 
codec) = 0;
 
     /** Note: this function is async-signal safe */
     virtual void shutdown() = 0;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Tue Mar 18 
14:31:08 2008
@@ -27,12 +27,8 @@
 #include "Thread.h"
 
 #include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/sys/ConnectionInputHandler.h"
-#include "qpid/sys/ConnectionInputHandlerFactory.h"
 #include "qpid/framing/reply_exceptions.h"
-#include "qpid/framing/AMQDataBlock.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/log/Statement.h"
 
 #include <boost/bind.hpp>
@@ -40,6 +36,7 @@
 #include <queue>
 #include <vector>
 #include <memory>
+#include <ostream>
 
 namespace qpid {
 namespace sys {
@@ -53,10 +50,8 @@
   public:
     AsynchIOAcceptor(int16_t port, int backlog, int threads);
     ~AsynchIOAcceptor() {}
-    void run(ConnectionInputHandlerFactory* factory);
-    ConnectionInputHandler* connect(
-        const std::string& host, int16_t port,
-        ConnectionInputHandlerFactory* factory);
+    void run(ConnectionCodec::Factory*);
+    void connect(const std::string& host, int16_t port, 
ConnectionCodec::Factory*);
 
     void shutdown();
         
@@ -64,13 +59,12 @@
     std::string getHost() const;
 
   private:
-    void accepted(Poller::shared_ptr, const Socket&, 
ConnectionInputHandlerFactory*);
+    void accepted(Poller::shared_ptr, const Socket&, 
ConnectionCodec::Factory*);
 };
 
 Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads)
 {
-    return
-       Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads));
+    return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads));
 }
 
 AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) :
@@ -88,48 +82,43 @@
     { delete [] bytes;}
 };
 
-class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
+class AsynchIOHandler : public OutputControl {
     AsynchIO* aio;
-    ConnectionInputHandler* inputHandler;
-    std::queue<framing::AMQFrame> frameQueue;
-    Mutex frameQueueLock;
-    bool frameQueueClosed;
-    bool isInitiated;
+    ConnectionCodec::Factory* factory;
+    ConnectionCodec* codec;
     bool readError;
     std::string identifier;
     bool isClient;
 
-    void write(const framing::AMQDataBlock&);
+    void write(const framing::ProtocolInitiation&);
 
   public:
     AsynchIOHandler() :
-        inputHandler(0),
-        frameQueueClosed(false),
-        isInitiated(false),
+        aio(0),
+        factory(0),
+        codec(0),
         readError(false),
         isClient(false)
     {}
        
     ~AsynchIOHandler() {
-        if (inputHandler)
-            inputHandler->closed();
-        delete inputHandler;
+        if (codec)
+            codec->closed();
+        delete codec;
     }
 
     void setClient() { isClient = true; }
-
-    void init(AsynchIO* a, ConnectionInputHandler* h) {
+    
+    void init(AsynchIO* a, ConnectionCodec::Factory* f) {
         aio = a;
-        inputHandler = h;
+        factory = f;
         identifier = aio->getSocket().getPeerAddress();
+
     }
 
     // Output side
-    void send(framing::AMQFrame&);
     void close();
     void activateOutput();
-    void initiated(const framing::ProtocolInitiation&);
-
 
     // Input side
     void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
@@ -142,10 +131,8 @@
     void closedSocket(AsynchIO& aio, const Socket& s);
 };
 
-void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, 
ConnectionInputHandlerFactory* f) {
-
+void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, 
ConnectionCodec::Factory* f) {
     AsynchIOHandler* async = new AsynchIOHandler; 
-    ConnectionInputHandler* handler = f->create(async, s.getPeerAddress());
     AsynchIO* aio = new AsynchIO(s,
                                  boost::bind(&AsynchIOHandler::readbuff, 
async, _1, _2),
                                  boost::bind(&AsynchIOHandler::eof, async, _1),
@@ -153,8 +140,7 @@
                                  boost::bind(&AsynchIOHandler::closedSocket, 
async, _1, _2),
                                  boost::bind(&AsynchIOHandler::nobuffs, async, 
_1),
                                  boost::bind(&AsynchIOHandler::idle, async, 
_1));
-    async->init(aio, handler);
-
+    async->init(aio, f);
     // Give connection some buffers to use
     for (int i = 0; i < 4; i++) {
         aio->queueReadBuffer(new Buff);
@@ -171,7 +157,7 @@
     return listener.getSockname();
 }
 
-void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
+void AsynchIOAcceptor::run(ConnectionCodec::Factory* fact) {
     Dispatcher d(poller);
     AsynchAcceptor
         acceptor(listener,
@@ -193,13 +179,13 @@
     }
 }
     
-ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, 
int16_t port, ConnectionInputHandlerFactory* f)
+void AsynchIOAcceptor::connect(
+    const std::string& host, int16_t port, ConnectionCodec::Factory* f)
 {
     Socket* socket = new Socket();//Should be deleted by handle when socket 
closes
     socket->connect(host, port);
     AsynchIOHandler* async = new AsynchIOHandler; 
     async->setClient();
-    ConnectionInputHandler* handler = f->create(async, 
socket->getPeerAddress());
     AsynchIO* aio = new AsynchIO(*socket,
                                  boost::bind(&AsynchIOHandler::readbuff, 
async, _1, _2),
                                  boost::bind(&AsynchIOHandler::eof, async, _1),
@@ -207,14 +193,12 @@
                                  boost::bind(&AsynchIOHandler::closedSocket, 
async, _1, _2),
                                  boost::bind(&AsynchIOHandler::nobuffs, async, 
_1),
                                  boost::bind(&AsynchIOHandler::idle, async, 
_1));
-    async->init(aio, handler);
-
+    async->init(aio, f);
     // Give connection some buffers to use
     for (int i = 0; i < 4; i++) {
         aio->queueReadBuffer(new Buff);
     }
     aio->start(poller);
-    return handler;
 }
 
 
@@ -225,8 +209,9 @@
 }
 
 
-void AsynchIOHandler::write(const framing::AMQDataBlock& data)
+void AsynchIOHandler::write(const framing::ProtocolInitiation& data)
 {
+    QPID_LOG(debug, "SENT [" << identifier << "] INIT( " << data << ")");
     AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
     if (!buff)
         buff = new Buff;
@@ -236,68 +221,45 @@
     aio->queueWrite(buff);
 }
 
-// Output side
-void AsynchIOHandler::send(framing::AMQFrame& frame) {
-    // TODO: Need to find out if we are in the callback context,
-    // in the callback thread if so we can go further than just queuing the 
frame
-    // to be handled later
-    {
-       ScopedLock<Mutex> l(frameQueueLock);
-       // Ignore anything seen after closing
-       if (!frameQueueClosed)
-            frameQueue.push(frame);
-    }
-
-    // Activate aio for writing here
-    aio->notifyPendingWrite();
-}
-
-void AsynchIOHandler::close() {
-    ScopedLock<Mutex> l(frameQueueLock);
-    frameQueueClosed = true;
-}
-
 void AsynchIOHandler::activateOutput() {
     aio->notifyPendingWrite();
 }
 
-void AsynchIOHandler::initiated(const framing::ProtocolInitiation& pi)
-{
-    write(pi);
-}
-
 // Input side
 void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
     if (readError) {
         return;
     }
-    framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
-    if(isInitiated){
-        framing::AMQFrame frame;
-        try{
-            while(frame.decode(in)) {
-                QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
-                inputHandler->received(frame);
-            }
+    size_t decoded = 0;
+    if (codec) {                // Already initiated
+        try {
+            decoded = codec->decode(buff->bytes+buff->dataStart, 
buff->dataCount);
         }catch(const std::exception& e){
             QPID_LOG(error, e.what());
             readError = true;
             aio->queueWriteClose();
         }
     }else{
+        framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
         framing::ProtocolInitiation protocolInit;
-        if(protocolInit.decode(in)){
-            QPID_LOG(debug, "INIT [" << identifier << "]");
-            inputHandler->initiated(protocolInit);
-            isInitiated = true;
+        if (protocolInit.decode(in)) {
+            decoded = in.getPosition();
+            QPID_LOG(debug, "RECV [" << identifier << "] INIT( " << 
protocolInit << ")");
+            codec = factory->create(protocolInit.getVersion(), *this, 
identifier);
+            if (!codec) {
+                // FIXME aconway 2008-03-18: send valid version header & close 
connection.
+                // FIXME aconway 2008-03-18: exception type
+                throw Exception(
+                    QPID_MSG("Protocol version not supported: " << 
protocolInit));
+            }
         }
     }
     // TODO: unreading needs to go away, and when we can cope
     // with multiple sub-buffers in the general buffer scheme, it will
-    if (in.available() != 0) {
+    if (decoded != size_t(buff->dataCount)) {
         // Adjust buffer for used bytes and then "unread them"
-        buff->dataStart += buff->dataCount-in.available();
-        buff->dataCount = in.available();
+        buff->dataStart += decoded;
+        buff->dataCount -= decoded;
         aio->unread(buff);
     } else {
         // Give whole buffer back to aio subsystem
@@ -307,7 +269,7 @@
 
 void AsynchIOHandler::eof(AsynchIO&) {
     QPID_LOG(debug, "DISCONNECTED [" << identifier << "]");
-    inputHandler->closed();
+    if (codec) codec->closed();
     aio->queueWriteClose();
 }
 
@@ -331,70 +293,22 @@
 }
 
 void AsynchIOHandler::idle(AsynchIO&){
-    if (isClient && !isInitiated) {
-        //get & write protocol header from upper layers
-        write(inputHandler->getInitiation());
-        isInitiated = true;
+    if (isClient && codec == 0) {
+        codec = factory->create(*this, identifier);
+        write(framing::ProtocolInitiation(codec->getVersion()));
         return;
     }
-    ScopedLock<Mutex> l(frameQueueLock);
-       
-    if (frameQueue.empty()) {
-        // At this point we know that we're write idling the connection
-        // so tell the input handler to queue any available output:
-        inputHandler->doOutput();
-        //if still no frames, theres nothing to do:
-        if (frameQueue.empty()) return;
-    }
-       
-    do {
+    if (codec == 0) return;
+    while (codec->canEncode()) {
         // Try and get a queued buffer if not then construct new one
         AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
-        if (!buff)
-            buff = new Buff;
-        framing::Buffer out(buff->bytes, buff->byteCount);
-        int buffUsed = 0;
-       
-        framing::AMQFrame frame = frameQueue.front();
-        int frameSize = frame.size();
-        int framesEncoded=0;
-        while (frameSize <= int(out.available())) {
-            frameQueue.pop();
-       
-            // Encode output frame     
-            frame.encode(out);
-            ++framesEncoded;
-            buffUsed += frameSize;
-            QPID_LOG(trace, "SENT [" << identifier << "]: " << frame);
-                       
-            if (frameQueue.empty()) {
-                //if we have run out of frames, allow upper layers to
-                //generate more
-                if (!frameQueueClosed) {
-                    inputHandler->doOutput();
-                }
-                if (frameQueue.empty()) {                
-                    //if there are still no frames, we have no more to
-                    //do
-                    break;
-                }
-            }
-            frame = frameQueue.front();
-            frameSize = frame.size();
-        }
-        QPID_LOG(trace, "Writing buffer: " << buffUsed << " bytes " << 
framesEncoded << " frames ");
-
-        // If frame was egregiously large complain
-        if (frameSize > buff->byteCount)
-            throw framing::ContentTooLargeException(QPID_MSG("Could not write 
frame, too large for buffer."));
-       
-        buff->dataCount = buffUsed;
+        if (!buff) buff = new Buff;
+        size_t encoded=codec->encode(buff->bytes, buff->byteCount);
+        buff->dataCount = encoded;
         aio->queueWrite(buff);
-    } while (!frameQueue.empty());
-
-    if (frameQueueClosed) {
-        aio->queueWriteClose();
     }
+    if (codec->isClosed())
+        aio->queueWriteClose();
 }
 
 }} // namespace qpid::sys

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h?rev=638590&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h Tue Mar 18 
14:31:08 2008
@@ -0,0 +1,80 @@
+#ifndef QPID_SYS_CONNECTION_CODEC_H
+#define QPID_SYS_CONNECTION_CODEC_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/framing/ProtocolVersion.h"
+#include "OutputControl.h"
+#include <memory>
+#include <map>
+
+namespace qpid {
+
+namespace broker { class Broker; }
+
+namespace sys {
+
+/**
+ * Interface of coder/decoder for a connection of a specific protocol
+ * version.
+ */
+class ConnectionCodec {
+  public:
+    virtual ~ConnectionCodec() {}
+
+    /** Decode from buffer, return number of bytes decoded.
+     * @return may be less than size if there was incomplete
+     * data at the end of the buffer.
+     */
+    virtual size_t decode(const char* buffer, size_t size) = 0;
+
+
+    /** Encode into buffer, return number of bytes encoded */
+    virtual size_t encode(const char* buffer, size_t size) = 0;
+
+    /** Return true if we have data to encode */
+    virtual bool canEncode() = 0;
+
+    /** Network connection was closed from other end. */
+    virtual void closed() = 0;
+    
+    virtual bool isClosed() const = 0;
+
+    virtual framing::ProtocolVersion getVersion() const = 0;
+    
+    struct Factory {
+        virtual ~Factory() {}
+
+        /** Return 0 if version unknown */
+        virtual ConnectionCodec* create(
+            framing::ProtocolVersion, OutputControl&, const std::string& id
+        ) = 0;
+
+        /** Return "preferred" codec for outbound connections. */
+        virtual ConnectionCodec* create(
+            OutputControl&, const std::string& id
+        ) = 0;
+    };
+};
+
+}} // namespace qpid::sys
+
+#endif  /*!QPID_SYS_CONNECTION_CODEC_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h Tue Mar 
18 14:31:08 2008
@@ -22,8 +22,6 @@
 #define _ConnectionInputHandler_
 
 #include "qpid/framing/InputHandler.h"
-#include "qpid/framing/InitiationHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
 #include "OutputTask.h"
 #include "TimeoutHandler.h"
 
@@ -31,12 +29,10 @@
 namespace sys {
 
     class ConnectionInputHandler :
-        public qpid::framing::InitiationHandler,
         public qpid::framing::InputHandler, 
         public TimeoutHandler, public OutputTask
     {
     public:
-        virtual qpid::framing::ProtocolInitiation getInitiation() = 0;
         virtual void closed() = 0;
     };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h Tue 
Mar 18 14:31:08 2008
@@ -22,7 +22,6 @@
 #define _ConnectionOutputHandler_
 
 #include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/InitiationHandler.h"
 #include "OutputControl.h"
 
 namespace qpid {
@@ -31,7 +30,7 @@
 /**
  * Provides the output handler associated with a connection.
  */
-class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, 
public OutputControl, public framing::InitiationHandler
+class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, 
public OutputControl
 {
   public:
     virtual void close() = 0;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/MockConnectionInputHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/MockConnectionInputHandler.h?rev=638590&r1=638589&r2=638590&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/MockConnectionInputHandler.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/MockConnectionInputHandler.h Tue 
Mar 18 14:31:08 2008
@@ -22,7 +22,6 @@
 #include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/ConnectionInputHandlerFactory.h"
 #include "qpid/sys/Monitor.h"
-#include "qpid/framing/ProtocolInitiation.h"
 
 struct MockConnectionInputHandler : public qpid::sys::ConnectionInputHandler {
 
@@ -30,23 +29,12 @@
 
     ~MockConnectionInputHandler() {}
     
-    void initiated(const qpid::framing::ProtocolInitiation& pi) {
-        qpid::sys::Monitor::ScopedLock l(monitor);
-        init = pi;
-        setState(GOT_INIT);
-    }
-
     void received(qpid::framing::AMQFrame* framep) {
         qpid::sys::Monitor::ScopedLock l(monitor);
         frame = *framep;
         setState(GOT_FRAME);
     }
 
-    qpid::framing::ProtocolInitiation waitForProtocolInit() {        
-        waitFor(GOT_INIT);
-        return init;
-    }
-
     qpid::framing::AMQFrame waitForFrame() {        
         waitFor(GOT_FRAME);
         return frame;
@@ -65,7 +53,7 @@
     void idleIn() {}
 
   private:
-    typedef enum { START, GOT_INIT, GOT_FRAME, CLOSED } State;
+    typedef enum { START, GOT_FRAME, CLOSED } State;
 
     void setState(State s) {
         state = s;
@@ -81,7 +69,6 @@
 
     qpid::sys::Monitor  monitor;
     State state;
-    qpid::framing::ProtocolInitiation init;
     qpid::framing::AMQFrame frame;
 };
 


Reply via email to