Author: astitcher
Date: Fri Apr 18 12:44:25 2008
New Revision: 649666
URL: http://svn.apache.org/viewvc?rev=649666&view=rev
Log:
Split AsynchIOAcceptor into IOHandler and connection control code
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=649666&r1=649665&r2=649666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Apr 18 12:44:25 2008
@@ -172,7 +172,7 @@
qpid/Plugin.cpp \
qpid/Url.cpp \
qpid/sys/AggregateOutput.cpp \
- qpid/sys/AsynchIOAcceptor.cpp \
+ qpid/sys/AsynchIOHandler.cpp \
qpid/sys/Dispatcher.cpp \
qpid/sys/Runnable.cpp \
qpid/sys/SystemInfo.cpp \
@@ -259,7 +259,8 @@
qpid/management/Manageable.cpp \
qpid/management/ManagementAgent.cpp \
qpid/management/ManagementExchange.cpp \
- qpid/management/ManagementObject.cpp
+ qpid/management/ManagementObject.cpp \
+ qpid/sys/TCPIOPlugin.cpp
libqpidclient_la_LIBADD = libqpidcommon.la
libqpidclient_la_SOURCES = \
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=649666&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Fri Apr 18
12:44:25 2008
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AsynchIOHandler.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace sys {
+
+// Buffer definition
+struct Buff : public AsynchIO::BufferBase {
+ Buff() :
+ AsynchIO::BufferBase(new char[65536], 65536)
+ {}
+ ~Buff()
+ { delete [] bytes;}
+};
+
+AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
+ identifier(id),
+ aio(0),
+ factory(f),
+ codec(0),
+ readError(false),
+ isClient(false)
+{}
+
+AsynchIOHandler::~AsynchIOHandler() {
+ if (codec)
+ codec->closed();
+ delete codec;
+}
+
+void AsynchIOHandler::init(AsynchIO* a, int numBuffs) {
+ aio = a;
+
+ // Give connection some buffers to use
+ for (int i = 0; i < numBuffs; i++) {
+ aio->queueReadBuffer(new Buff);
+ }
+}
+
+void AsynchIOHandler::write(const framing::ProtocolInitiation& data)
+{
+ QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")");
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+ if (!buff)
+ buff = new Buff;
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ data.encode(out);
+ buff->dataCount = data.size();
+ aio->queueWrite(buff);
+}
+
+void AsynchIOHandler::activateOutput() {
+ aio->notifyPendingWrite();
+}
+
+// Input side
+void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
+ if (readError) {
+ return;
+ }
+ size_t decoded = 0;
+ if (codec) { // Already initiated
+ try {
+ decoded = codec->decode(buff->bytes+buff->dataStart,
buff->dataCount);
+ }catch(const std::exception& e){
+ QPID_LOG(error, e.what());
+ readError = true;
+ aio->queueWriteClose();
+ }
+ }else{
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+ framing::ProtocolInitiation protocolInit;
+ if (protocolInit.decode(in)) {
+ decoded = in.getPosition();
+ QPID_LOG(debug, "RECV [" << identifier << "] INIT(" <<
protocolInit << ")");
+ try {
+ codec = factory->create(protocolInit.getVersion(), *this,
identifier);
+ if (!codec) {
+ //TODO: may still want to revise this...
+ //send valid version header & close connection.
+
write(framing::ProtocolInitiation(framing::highestProtocolVersion));
+ readError = true;
+ aio->queueWriteClose();
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(error, e.what());
+ readError = true;
+ aio->queueWriteClose();
+ }
+ }
+ }
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (decoded != size_t(buff->dataCount)) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += decoded;
+ buff->dataCount -= decoded;
+ aio->unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio->queueReadBuffer(buff);
+ }
+}
+
+void AsynchIOHandler::eof(AsynchIO&) {
+ QPID_LOG(debug, "DISCONNECTED [" << identifier << "]");
+ if (codec) codec->closed();
+ aio->queueWriteClose();
+}
+
+void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) {
+ // If we closed with data still to send log a warning
+ if (!aio->writeQueueEmpty()) {
+ QPID_LOG(warning, "CLOSING [" << identifier << "] unsent data
(probably due to client disconnect)");
+ }
+ delete &s;
+ aio->queueForDeletion();
+ delete this;
+}
+
+void AsynchIOHandler::disconnect(AsynchIO& a) {
+ // treat the same as eof
+ eof(a);
+}
+
+// Notifications
+void AsynchIOHandler::nobuffs(AsynchIO&) {
+}
+
+void AsynchIOHandler::idle(AsynchIO&){
+ if (isClient && codec == 0) {
+ codec = factory->create(*this, identifier);
+ write(framing::ProtocolInitiation(codec->getVersion()));
+ return;
+ }
+ if (codec == 0) return;
+ if (codec->canEncode()) {
+ // Try and get a queued buffer if not then construct new one
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+ if (!buff) buff = new Buff;
+ size_t encoded=codec->encode(buff->bytes, buff->byteCount);
+ buff->dataCount = encoded;
+ aio->queueWrite(buff);
+ }
+ if (codec->isClosed())
+ aio->queueWriteClose();
+}
+
+}} // namespace qpid::sys
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h?rev=649666&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h Fri Apr 18
12:44:25 2008
@@ -0,0 +1,70 @@
+#ifndef _sys_AsynchIOHandler_h
+#define _sys_AsynchIOHandler_h
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "OutputControl.h"
+#include "ConnectionCodec.h"
+#include "AsynchIO.h"
+
+namespace qpid {
+
+namespace framing {
+ class ProtocolInitiation;
+}
+
+namespace sys {
+
+class AsynchIOHandler : public OutputControl {
+ std::string identifier;
+ AsynchIO* aio;
+ ConnectionCodec::Factory* factory;
+ ConnectionCodec* codec;
+ bool readError;
+ bool isClient;
+
+ void write(const framing::ProtocolInitiation&);
+
+ public:
+ AsynchIOHandler(std::string id, ConnectionCodec::Factory* f);
+ ~AsynchIOHandler();
+ void init(AsynchIO* a, int numBuffs);
+
+ void setClient() { isClient = true; }
+
+ // Output side
+ void close();
+ void activateOutput();
+
+ // Input side
+ void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
+ void eof(AsynchIO& aio);
+ void disconnect(AsynchIO& aio);
+
+ // Notifications
+ void nobuffs(AsynchIO& aio);
+ void idle(AsynchIO& aio);
+ void closedSocket(AsynchIO& aio, const Socket& s);
+};
+
+}} // namespace qpid::sys
+
+#endif // _sys_AsynchIOHandler_h
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=649666&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Fri Apr 18
12:44:25 2008
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Acceptor.h"
+
+#include "AsynchIOHandler.h"
+#include "AsynchIO.h"
+
+#include <boost/bind.hpp>
+#include <memory>
+
+namespace qpid {
+namespace sys {
+
+class AsynchIOAcceptor : public Acceptor {
+ Socket listener;
+ const uint16_t listeningPort;
+ std::auto_ptr<AsynchAcceptor> acceptor;
+
+ public:
+ AsynchIOAcceptor(int16_t port, int backlog);
+ void run(Poller::shared_ptr, ConnectionCodec::Factory*);
+ void connect(Poller::shared_ptr, const std::string& host, int16_t port,
ConnectionCodec::Factory*);
+
+ uint16_t getPort() const;
+ std::string getHost() const;
+
+ private:
+ void accepted(Poller::shared_ptr, const Socket&,
ConnectionCodec::Factory*);
+};
+
+Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog)
+{
+ return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog));
+}
+
+AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) :
+ listeningPort(listener.listen(port, backlog)),
+ acceptor(0)
+{}
+
+void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f) {
+ AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
+ AsynchIO* aio = new AsynchIO(s,
+ boost::bind(&AsynchIOHandler::readbuff,
async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect,
async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket,
async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async,
_1),
+ boost::bind(&AsynchIOHandler::idle, async,
_1));
+ async->init(aio, 4);
+ aio->start(poller);
+}
+
+
+uint16_t AsynchIOAcceptor::getPort() const {
+ return listeningPort; // Immutable no need for lock.
+}
+
+std::string AsynchIOAcceptor::getHost() const {
+ return listener.getSockname();
+}
+
+void AsynchIOAcceptor::run(Poller::shared_ptr poller,
ConnectionCodec::Factory* fact) {
+ acceptor.reset(
+ new AsynchAcceptor(listener,
+ boost::bind(&AsynchIOAcceptor::accepted, this,
poller, _1, fact)));
+ acceptor->start(poller);
+}
+
+void AsynchIOAcceptor::connect(
+ Poller::shared_ptr poller,
+ const std::string& host, int16_t port,
+ ConnectionCodec::Factory* f)
+{
+ Socket* socket = new Socket();//Should be deleted by handle when socket
closes
+ socket->connect(host, port);
+ AsynchIOHandler* async = new AsynchIOHandler(socket->getPeerAddress(), f);
+ async->setClient();
+ AsynchIO* aio = new AsynchIO(*socket,
+ boost::bind(&AsynchIOHandler::readbuff,
async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect,
async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket,
async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async,
_1),
+ boost::bind(&AsynchIOHandler::idle, async,
_1));
+ async->init(aio, 4);
+ aio->start(poller);
+}
+
+}} // namespace qpid::sys