Author: aconway
Date: Mon Jan 15 13:56:23 2007
New Revision: 496511
URL: http://svn.apache.org/viewvc?view=rev&rev=496511
Log:
* Client & broker using Requester/Responder to manage request/response IDs.
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h
incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolVersionException.h
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/MethodBodyClass.h.tmpl
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp Mon Jan 15
13:56:23 2007
@@ -23,6 +23,7 @@
#include "AMQFrame.h"
#include "DirectExchange.h"
+#include "TopicExchange.h"
#include "FanOutExchange.h"
#include "HeadersExchange.h"
#include "MessageStoreModule.h"
@@ -102,3 +103,4 @@
}} // namespace qpid::broker
+
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h Mon Jan 15
13:56:23 2007
@@ -29,6 +29,15 @@
#include <SharedObject.h>
#include <MessageStore.h>
#include <AutoDelete.h>
+#include "Requester.h"
+#include "Responder.h"
+#include <ExchangeRegistry.h>
+#include <BrokerChannel.h>
+#include <ConnectionToken.h>
+#include <DirectExchange.h>
+#include <OutputHandler.h>
+#include <ProtocolInitiation.h>
+#include <QueueRegistry.h>
namespace qpid {
namespace broker {
@@ -77,6 +86,8 @@
u_int32_t getTimeout() { return timeout; }
u_int64_t getStagingThreshold() { return stagingThreshold; }
AutoDelete& getCleaner() { return cleaner; }
+ qpid::framing::Requester& getRequester() { return requester; }
+ qpid::framing::Responder& getResponder() { return responder; }
private:
Broker(const Configuration& config);
@@ -89,6 +100,8 @@
u_int64_t stagingThreshold;
AutoDelete cleaner;
SessionHandlerFactoryImpl factory;
+ qpid::framing::Requester requester;
+ qpid::framing::Responder responder;
};
}}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp Mon
Jan 15 13:56:23 2007
@@ -19,11 +19,15 @@
*
*/
#include <iostream>
-#include <SessionHandlerImpl.h>
-#include <FanOutExchange.h>
-#include <HeadersExchange.h>
-#include <TopicExchange.h>
-#include "assert.h"
+#include <assert.h>
+
+#include "SessionHandlerImpl.h"
+
+#include "FanOutExchange.h"
+#include "HeadersExchange.h"
+
+#include "Requester.h"
+#include "Responder.h"
using namespace boost;
using namespace qpid::sys;
@@ -42,6 +46,8 @@
exchanges(broker.getExchanges()),
cleaner(broker.getCleaner()),
settings(broker.getTimeout(), broker.getStagingThreshold()),
+ requester(broker.getRequester()),
+ responder(broker.getResponder()),
basicHandler(new BasicHandlerImpl(this)),
channelHandler(new ChannelHandlerImpl(this)),
connectionHandler(new ConnectionHandlerImpl(this)),
@@ -55,7 +61,7 @@
SessionHandlerImpl::~SessionHandlerImpl(){
- if (client != NULL)
+ if (client != NULL)
delete client;
}
@@ -87,51 +93,87 @@
return exchanges.get(name);
}
+void SessionHandlerImpl::handleMethod(
+ u_int16_t channel, qpid::framing::AMQBody::shared_ptr body)
+{
+ AMQMethodBody::shared_ptr method =
+ shared_polymorphic_cast<AMQMethodBody, AMQBody>(body);
+ try{
+ method->invoke(*this, channel);
+ }catch(ChannelException& e){
+ channels[channel]->close();
+ channels.erase(channel);
+ client->getChannel().close(
+ channel, e.code, e.text,
+ method->amqpClassId(), method->amqpMethodId());
+ }catch(ConnectionException& e){
+ client->getConnection().close(
+ 0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ client->getConnection().close(
+ 0, 541/*internal error*/, e.what(),
+ method->amqpClassId(), method->amqpMethodId());
+ }
+}
+
void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
u_int16_t channel = frame->getChannel();
AMQBody::shared_ptr body = frame->getBody();
- AMQMethodBody::shared_ptr method;
-
switch(body->type())
{
case REQUEST_BODY:
- // responder.received(frame);
+ responder.received(AMQRequestBody::getData(body));
+ handleMethod(channel, body);
+ break;
case RESPONSE_BODY:
- // requester.received(frame);
- case METHOD_BODY: //
- method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
- try{
- method->invoke(*this, channel);
- }catch(ChannelException& e){
- channels[channel]->close();
- channels.erase(channel);
- client->getChannel().close(channel, e.code, e.text,
method->amqpClassId(), method->amqpMethodId());
- }catch(ConnectionException& e){
- client->getConnection().close(0, e.code, e.text,
method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- string error(e.what());
- client->getConnection().close(0, 541/*internal error*/, error,
method->amqpClassId(), method->amqpMethodId());
- }
+ // Must process responses before marking them received.
+ handleMethod(channel, body);
+ requester.processed(AMQResponseBody::getData(body));
+ break;
+ // TODO aconway 2007-01-15: Leftover from 0-8 support, remove.
+ case METHOD_BODY:
+ handleMethod(channel, body);
+ break;
+ case HEADER_BODY:
+ handleHeader(
+ channel, shared_polymorphic_cast<AMQHeaderBody>(body));
break;
- case HEADER_BODY:
- this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody,
AMQBody>(body));
+ case CONTENT_BODY:
+ handleContent(
+ channel, shared_polymorphic_cast<AMQContentBody>(body));
break;
- case CONTENT_BODY:
- this->handleContent(channel, dynamic_pointer_cast<AMQContentBody,
AMQBody>(body));
- break;
-
- case HEARTBEAT_BODY:
- //channel must be 0
- this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody,
AMQBody>(body));
+ case HEARTBEAT_BODY:
+ assert(channel == 0);
+ handleHeartbeat(
+ shared_polymorphic_cast<AMQHeartbeatBody>(body));
break;
}
}
+/**
+ * An OutputHandler that does request/response procssing before
+ * delgating to another OutputHandler.
+ */
+SessionHandlerImpl::Sender::Sender(
+ OutputHandler& oh, Requester& req, Responder& resp)
+ : out(oh), requester(req), responder(resp)
+{}
+
+void SessionHandlerImpl::Sender::send(AMQFrame* frame) {
+ AMQBody::shared_ptr body = frame->getBody();
+ u_int16_t type = body->type();
+ if (type == REQUEST_BODY)
+ requester.sending(AMQRequestBody::getData(body));
+ else if (type == RESPONSE_BODY)
+ responder.sending(AMQResponseBody::getData(body));
+ out.send(frame);
+}
+
void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
- if (client == NULL)
+ if (client == 0)
{
client = new qpid::framing::AMQP_ClientProxy(context,
header->getMajor(), header->getMinor());
@@ -280,7 +322,7 @@
const string& /*routingKey*/,
const qpid::framing::FieldTable& /*arguments*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
@@ -335,9 +377,9 @@
Queue::shared_ptr queue = parent->getQueue(queueName, channel);
Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName);
if(exchange){
-// kpvdr - cannot use this any longer as routingKey is now const
-// if(routingKey.empty() && queueName.empty()) routingKey =
queue->getName();
-// exchange->bind(queue, routingKey, &arguments);
+ // kpvdr - cannot use this any longer as routingKey is now const
+ // if(routingKey.empty() && queueName.empty()) routingKey =
queue->getName();
+ // exchange->bind(queue, routingKey, &arguments);
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ?
queue->getName() : routingKey;
exchange->bind(queue, exchangeRoutingKey, &arguments);
if(!nowait) parent->client->getQueue().bindOk(channel);
@@ -483,25 +525,25 @@
const string& /*routingKey*/,
const qpid::framing::FieldTable& /*arguments*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
SessionHandlerImpl::ChannelHandlerImpl::ok( u_int16_t /*channel*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
SessionHandlerImpl::ChannelHandlerImpl::ping( u_int16_t /*channel*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
SessionHandlerImpl::ChannelHandlerImpl::pong( u_int16_t /*channel*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
void
@@ -509,148 +551,149 @@
u_int16_t /*channel*/,
const string& /*channelId*/ )
{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
// Message class method handlers
void
SessionHandlerImpl::MessageHandlerImpl::append( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*bytes*/ )
+ const string& /*reference*/,
+ const string& /*bytes*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::cancel( u_int16_t /*channel*/,
- const string& /*destination*/ )
+ const string& /*destination*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
+ const string&
/*reference*/,
+ const string&
/*identifier*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::close( u_int16_t /*channel*/,
- const string& /*reference*/ )
+ const string& /*reference*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::consume( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*queue*/,
- const string& /*destination*/,
- bool /*noLocal*/,
- bool /*noAck*/,
- bool /*exclusive*/,
- const qpid::framing::FieldTable& /*filter*/ )
+ u_int16_t /*ticket*/,
+ const string& /*queue*/,
+ const string& /*destination*/,
+ bool /*noLocal*/,
+ bool /*noAck*/,
+ bool /*exclusive*/,
+ const
qpid::framing::FieldTable& /*filter*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::get( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*queue*/,
- const string& /*destination*/,
- bool /*noAck*/ )
+ u_int16_t /*ticket*/,
+ const string& /*queue*/,
+ const string& /*destination*/,
+ bool /*noAck*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::offset( u_int16_t /*channel*/,
- u_int64_t /*value*/ )
+ u_int64_t /*value*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::open( u_int16_t /*channel*/,
- const string& /*reference*/ )
+ const string& /*reference*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::qos( u_int16_t /*channel*/,
- u_int32_t /*prefetchSize*/,
- u_int16_t /*prefetchCount*/,
- bool /*global*/ )
+ u_int32_t /*prefetchSize*/,
+ u_int16_t /*prefetchCount*/,
+ bool /*global*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::recover( u_int16_t /*channel*/,
- bool /*requeue*/ )
+ bool /*requeue*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::reject( u_int16_t /*channel*/,
- u_int16_t /*code*/,
- const string& /*text*/ )
+ u_int16_t /*code*/,
+ const string& /*text*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::resume( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
+ const string& /*reference*/,
+ const string& /*identifier*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
void
SessionHandlerImpl::MessageHandlerImpl::transfer( u_int16_t /*channel*/,
- u_int16_t /*ticket*/,
- const string& /*destination*/,
- bool /*redelivered*/,
- bool /*immediate*/,
- u_int64_t /*ttl*/,
- u_int8_t /*priority*/,
- u_int64_t /*timestamp*/,
- u_int8_t /*deliveryMode*/,
- u_int64_t /*expiration*/,
- const string& /*exchange*/,
- const string& /*routingKey*/,
- const string& /*messageId*/,
- const string& /*correlationId*/,
- const string& /*replyTo*/,
- const string& /*contentType*/,
- const string& /*contentEncoding*/,
- const string& /*userId*/,
- const string& /*appId*/,
- const string& /*transactionId*/,
- const string& /*securityToken*/,
- const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content /*body*/ )
+ u_int16_t /*ticket*/,
+ const string&
/*destination*/,
+ bool /*redelivered*/,
+ bool /*immediate*/,
+ u_int64_t /*ttl*/,
+ u_int8_t /*priority*/,
+ u_int64_t /*timestamp*/,
+ u_int8_t /*deliveryMode*/,
+ u_int64_t /*expiration*/,
+ const string& /*exchange*/,
+ const string& /*routingKey*/,
+ const string& /*messageId*/,
+ const string&
/*correlationId*/,
+ const string& /*replyTo*/,
+ const string&
/*contentType*/,
+ const string&
/*contentEncoding*/,
+ const string& /*userId*/,
+ const string& /*appId*/,
+ const string&
/*transactionId*/,
+ const string&
/*securityToken*/,
+ const
qpid::framing::FieldTable& /*applicationHeaders*/,
+ qpid::framing::Content
/*body*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
}
}}
+
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h Mon
Jan 15 13:56:23 2007
@@ -24,28 +24,20 @@
#include <map>
#include <sstream>
#include <vector>
-#include <exception>
+
#include <AMQFrame.h>
#include <AMQP_ClientProxy.h>
#include <AMQP_ServerOperations.h>
-#include <AutoDelete.h>
-#include <ExchangeRegistry.h>
-#include <BrokerChannel.h>
-#include <ConnectionToken.h>
-#include <DirectExchange.h>
-#include <OutputHandler.h>
-#include <ProtocolInitiation.h>
-#include <QueueRegistry.h>
#include <sys/SessionContext.h>
#include <sys/SessionHandler.h>
#include <sys/TimeoutHandler.h>
-#include <TopicExchange.h>
#include "Broker.h"
+#include "Exception.h"
namespace qpid {
namespace broker {
-struct ChannelException : public std::exception {
+struct ChannelException : public qpid::Exception {
u_int16_t code;
string text;
ChannelException(u_int16_t _code, string _text) : code(_code), text(_text)
{}
@@ -53,7 +45,7 @@
const char* what() const throw() { return text.c_str(); }
};
-struct ConnectionException : public std::exception {
+struct ConnectionException : public qpid::Exception {
u_int16_t code;
string text;
ConnectionException(u_int16_t _code, string _text) : code(_code),
text(_text) {}
@@ -75,13 +67,25 @@
{
typedef std::map<u_int16_t, Channel*>::iterator channel_iterator;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
-
+ class Sender : public qpid::framing::OutputHandler {
+ public:
+ Sender(qpid::framing::OutputHandler&,
+ qpid::framing::Requester&, qpid::framing::Responder&);
+ void send(qpid::framing::AMQFrame* frame);
+ private:
+ OutputHandler& out;
+ qpid::framing::Requester& requester;
+ qpid::framing::Responder& responder;
+ };
+
qpid::sys::SessionContext* context;
qpid::framing::AMQP_ClientProxy* client;
QueueRegistry& queues;
ExchangeRegistry& exchanges;
AutoDelete& cleaner;
Settings settings;
+ qpid::framing::Requester& requester;
+ qpid::framing::Responder& responder;
std::auto_ptr<BasicHandler> basicHandler;
std::auto_ptr<ChannelHandler> channelHandler;
std::auto_ptr<ConnectionHandler> connectionHandler;
@@ -98,6 +102,7 @@
void handleHeader(u_int16_t channel,
qpid::framing::AMQHeaderBody::shared_ptr body);
void handleContent(u_int16_t channel,
qpid::framing::AMQContentBody::shared_ptr body);
+ void handleMethod(u_int16_t channel, qpid::framing::AMQBody::shared_ptr
body);
void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
Channel* getChannel(u_int16_t channel);
@@ -371,8 +376,6 @@
virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540,
"Tunnel class not implemented"); }
};
-}
-}
-
+}}
#endif
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.cpp Mon Jan 15
13:56:23 2007
@@ -32,10 +32,14 @@
u_int16_t Connection::channelIdCounter;
-Connection::Connection( bool debug, u_int32_t _max_frame_size,
qpid::framing::ProtocolVersion* _version) : max_frame_size(_max_frame_size),
closed(true),
+Connection::Connection(
+ bool debug, u_int32_t _max_frame_size,
+ qpid::framing::ProtocolVersion* _version
+) : max_frame_size(_max_frame_size), closed(true),
version(_version->getMajor(),_version->getMinor())
{
- connector = new Connector(version, debug, _max_frame_size);
+ connector = new Connector(
+ version, requester, responder, debug, _max_frame_size);
}
Connection::~Connection(){
@@ -152,6 +156,16 @@
}
void Connection::received(AMQFrame* frame){
+ AMQBody::shared_ptr body = frame->getBody();
+ u_int8_t type = body->type();
+ if (type == REQUEST_BODY)
+ responder.received(AMQRequestBody::getData(body));
+ handleFrame(frame);
+ if (type == RESPONSE_BODY)
+ requester.processed(AMQResponseBody::getData(body));
+}
+
+void Connection::handleFrame(AMQFrame* frame){
u_int16_t channelId = frame->getChannel();
if(channelId == 0){
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connection.h Mon Jan 15
13:56:23 2007
@@ -37,6 +37,8 @@
#include <ClientQueue.h>
#include <ResponseHandler.h>
#include <AMQP_HighestVersion.h>
+#include "Requester.h"
+#include "Responder.h"
namespace qpid {
@@ -79,6 +81,8 @@
ResponseHandler responses;
volatile bool closed;
qpid::framing::ProtocolVersion version;
+ qpid::framing::Requester requester;
+ qpid::framing::Responder responder;
void channelException(Channel* channel, qpid::framing::AMQMethodBody*
body, QpidError& e);
void error(int code, const std::string& msg, int classid = 0, int
methodid = 0);
@@ -89,6 +93,7 @@
virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr
body);
virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr
body);
virtual void
handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+ void handleFrame(qpid::framing::AMQFrame* frame);
public:
/**
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.cpp?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.cpp Mon Jan 15
13:56:23 2007
@@ -22,13 +22,17 @@
#include <QpidError.h>
#include <sys/Time.h>
#include "Connector.h"
+#include "Requester.h"
+#include "Responder.h"
using namespace qpid::sys;
using namespace qpid::client;
using namespace qpid::framing;
using qpid::QpidError;
-Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool
_debug, u_int32_t buffer_size) :
+Connector::Connector(const qpid::framing::ProtocolVersion& pVersion,
+ Requester& req, Responder& resp,
+ bool _debug, u_int32_t buffer_size) :
debug(_debug),
receive_buffer_size(buffer_size),
send_buffer_size(buffer_size),
@@ -40,7 +44,10 @@
timeoutHandler(0),
shutdownHandler(0),
inbuf(receive_buffer_size),
- outbuf(send_buffer_size){ }
+ outbuf(send_buffer_size),
+ requester(req),
+ responder(resp)
+{ }
Connector::~Connector(){ }
@@ -75,7 +82,13 @@
}
void Connector::send(AMQFrame* frame){
- writeBlock(frame);
+ AMQBody::shared_ptr body = frame->getBody();
+ u_int8_t type = body->type();
+ if (type == REQUEST_BODY)
+ requester.sending(AMQRequestBody::getData(body));
+ else if (type == RESPONSE_BODY)
+ responder.sending(AMQResponseBody::getData(body));
+ writeBlock(frame);
if(debug) std::cout << "SENT: " << *frame << std::endl;
delete frame;
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/client/Connector.h Mon Jan 15
13:56:23 2007
@@ -34,60 +34,73 @@
#include <sys/Socket.h>
namespace qpid {
+
+namespace framing {
+
+class Requester;
+class Responder;
+
+} // namespace framing
+
namespace client {
- class Connector : public qpid::framing::OutputHandler,
- private qpid::sys::Runnable
- {
- const bool debug;
- const int receive_buffer_size;
- const int send_buffer_size;
- qpid::framing::ProtocolVersion version;
-
- bool closed;
-
- int64_t lastIn;
- int64_t lastOut;
- int64_t timeout;
- u_int32_t idleIn;
- u_int32_t idleOut;
-
- qpid::sys::TimeoutHandler* timeoutHandler;
- qpid::sys::ShutdownHandler* shutdownHandler;
- qpid::framing::InputHandler* input;
- qpid::framing::InitiationHandler* initialiser;
- qpid::framing::OutputHandler* output;
+class Connector : public qpid::framing::OutputHandler,
+ private qpid::sys::Runnable
+{
+ const bool debug;
+ const int receive_buffer_size;
+ const int send_buffer_size;
+ qpid::framing::ProtocolVersion version;
+
+ bool closed;
+
+ int64_t lastIn;
+ int64_t lastOut;
+ int64_t timeout;
+ u_int32_t idleIn;
+ u_int32_t idleOut;
+
+ qpid::sys::TimeoutHandler* timeoutHandler;
+ qpid::sys::ShutdownHandler* shutdownHandler;
+ qpid::framing::InputHandler* input;
+ qpid::framing::InitiationHandler* initialiser;
+ qpid::framing::OutputHandler* output;
- qpid::framing::Buffer inbuf;
- qpid::framing::Buffer outbuf;
+ qpid::framing::Buffer inbuf;
+ qpid::framing::Buffer outbuf;
+
+ qpid::sys::Mutex writeLock;
+ qpid::sys::Thread receiver;
- qpid::sys::Mutex writeLock;
- qpid::sys::Thread receiver;
+ qpid::sys::Socket socket;
- qpid::sys::Socket socket;
+ qpid::framing::Requester& requester;
+ qpid::framing::Responder& responder;
- void checkIdle(ssize_t status);
- void writeBlock(qpid::framing::AMQDataBlock* data);
- void writeToSocket(char* data, size_t available);
- void setSocketTimeout();
-
- void run();
- void handleClosed();
-
- public:
- Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug =
false, u_int32_t buffer_size = 1024);
- virtual ~Connector();
- virtual void connect(const std::string& host, int port);
- virtual void init(qpid::framing::ProtocolInitiation* header);
- virtual void close();
- virtual void setInputHandler(qpid::framing::InputHandler* handler);
- virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
- virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
- virtual qpid::framing::OutputHandler* getOutputHandler();
- virtual void send(qpid::framing::AMQFrame* frame);
- virtual void setReadTimeout(u_int16_t timeout);
- virtual void setWriteTimeout(u_int16_t timeout);
- };
+ void checkIdle(ssize_t status);
+ void writeBlock(qpid::framing::AMQDataBlock* data);
+ void writeToSocket(char* data, size_t available);
+ void setSocketTimeout();
+
+ void run();
+ void handleClosed();
+
+ public:
+ Connector(const qpid::framing::ProtocolVersion& pVersion,
+ qpid::framing::Requester& req, qpid::framing::Responder& resp,
+ bool debug = false, u_int32_t buffer_size = 1024);
+ virtual ~Connector();
+ virtual void connect(const std::string& host, int port);
+ virtual void init(qpid::framing::ProtocolInitiation* header);
+ virtual void close();
+ virtual void setInputHandler(qpid::framing::InputHandler* handler);
+ virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
+ virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
+ virtual qpid::framing::OutputHandler* getOutputHandler();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void setReadTimeout(u_int16_t timeout);
+ virtual void setWriteTimeout(u_int16_t timeout);
+};
}
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQFrame.h Mon Jan
15 13:56:23 2007
@@ -21,7 +21,8 @@
* under the License.
*
*/
-/*#include <qpid/framing/amqp_methods.h>*/
+#include <boost/cast.hpp>
+
#include <amqp_types.h>
#include <AMQBody.h>
#include <AMQDataBlock.h>
@@ -49,6 +50,12 @@
virtual u_int32_t size() const;
u_int16_t getChannel();
AMQBody::shared_ptr getBody();
+
+ /** Convenience template to cast the body to an expected type */
+ template <class T> boost::shared_ptr<T> castBody() {
+ assert(dynamic_cast<T*>(getBody().get()));
+ boost::static_pointer_cast<T>(getBody());
+ }
u_int32_t decodeHead(Buffer& buffer);
void decodeBody(Buffer& buffer, uint32_t size);
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.cpp?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.cpp
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.cpp
Mon Jan 15 13:56:23 2007
@@ -55,4 +55,10 @@
return AMQRequestBody::shared_ptr(body);
}
+void AMQRequestBody::printPrefix(std::ostream& out) const {
+ out << "request(id=" << data.requestId << ",mark="
+ << data.responseMark << "): ";
+}
+
}} // namespace qpid::framing
+
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h
Mon Jan 15 13:56:23 2007
@@ -42,6 +42,10 @@
ResponseId responseMark;
};
+ static Data& getData(const AMQBody::shared_ptr& body) {
+ return boost::dynamic_pointer_cast<AMQRequestBody>(body)->getData();
+ }
+
static shared_ptr create(
AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
Buffer& buffer);
@@ -52,6 +56,7 @@
u_int8_t type() const { return REQUEST_BODY; }
void encode(Buffer& buffer) const;
+ Data& getData() { return data; }
RequestId getRequestId() const { return data.requestId; }
void setRequestId(RequestId id) { data.requestId=id; }
ResponseId getResponseMark() const { return data.responseMark; }
@@ -59,6 +64,7 @@
protected:
static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+16; }
+ void printPrefix(std::ostream& out) const;
private:
Data data;
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.cpp
Mon Jan 15 13:56:23 2007
@@ -56,5 +56,10 @@
return AMQResponseBody::shared_ptr(body);
}
+void AMQResponseBody::printPrefix(std::ostream& out) const {
+ out << "response(id=" << data.responseId << ",request=" << data.requestId
+ << ",batch=" << data.batchOffset << "): ";
+}
+
}} // namespace qpid::framing
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h
Mon Jan 15 13:56:23 2007
@@ -46,6 +46,10 @@
u_int32_t batchOffset;
};
+ static Data& getData(const AMQBody::shared_ptr& body) {
+ return boost::dynamic_pointer_cast<AMQResponseBody>(body)->getData();
+ }
+
static shared_ptr create(
AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
Buffer& buffer);
@@ -57,12 +61,15 @@
u_int8_t type() const { return RESPONSE_BODY; }
void encode(Buffer& buffer) const;
+ Data& getData() { return data; }
ResponseId getResponseId() { return data.responseId; }
RequestId getRequestId() { return data.requestId; }
BatchOffset getBatchOffset() { return data.batchOffset; }
protected:
static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; }
+ void printPrefix(std::ostream& out) const;
+
private:
Data data;
};
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolVersionException.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolVersionException.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
---
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolVersionException.h
(original)
+++
incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/ProtocolVersionException.h
Mon Jan 15 13:56:23 2007
@@ -27,12 +27,10 @@
#include <string>
#include <vector>
-namespace qpid
-{
-namespace framing
-{
+namespace qpid {
+namespace framing {
-class ProtocolVersionException : virtual public qpid::Exception
+class ProtocolVersionException : public qpid::Exception
{
protected:
ProtocolVersion versionFound;
@@ -49,7 +47,6 @@
virtual std::string toString() const throw();
}; // class ProtocolVersionException
-} // namespace framing
-} // namespace qpid
+}} // namespace qpid::framing
#endif //ifndef _ProtocolVersionException_
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h Mon Jan
15 13:56:23 2007
@@ -45,7 +45,7 @@
/** Called after processing a response. */
void processed(const AMQResponseBody::Data&);
-
+
private:
std::set<RequestId> requests; /** Sent but not responded to */
RequestId lastId;
Modified:
incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/MethodBodyClass.h.tmpl
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/MethodBodyClass.h.tmpl?view=diff&rev=496511&r1=496510&r2=496511
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/MethodBodyClass.h.tmpl
(original)
+++ incubator/qpid/branches/qpid.0-9/gentools/templ.cpp/MethodBodyClass.h.tmpl
Mon Jan 15 13:56:23 2007
@@ -69,7 +69,8 @@
inline void print(std::ostream& out) const
{
- out << "${CLASS}${METHOD}: ";
+ printPrefix(out);
+ out << "${CLASS}${METHOD}: ";
%{FLIST} ${mb_field_print}
}