Author: astitcher
Date: Fri Sep 19 07:15:54 2008
New Revision: 697101
URL: http://svn.apache.org/viewvc?rev=697101&view=rev
Log:
RDMA bugfixes:
- Changed Rdma connection creation to allocate all necessary buffer
memory immediately. This has the effect that no later buffer allocations
happen
which can fail so that once accepted connections won't fail because of lack of
locked memory.
- Fixed connection logic so we reject a new connection if we can't create the
necessary
handlers rather than kill the entire broker (this includes not enough locked
memory)
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=697101&r1=697100&r2=697101&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp Fri Sep 19
07:15:54 2008
@@ -305,6 +305,7 @@
Mutex::ScopedLock l(lock);
identifier = id;
aio = a;
+ assert(aio->bufferAvailable());
newBuffer();
}
void RdmaConnector::Writer::handle(framing::AMQFrame& frame) {
@@ -346,7 +347,7 @@
if (lastEof==0)
return;
size_t bytesWritten = 0;
- while (aio->writable() && !frames.empty()) {
+ while (aio->writable() && aio->bufferAvailable() && !frames.empty()) {
const AMQFrame* frame = &frames.front();
uint32_t size = frame->size();
while (size <= encode.available()) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=697101&r1=697100&r2=697101&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Fri Sep 19
07:15:54 2008
@@ -96,7 +96,7 @@
void RdmaIOHandler::write(const framing::ProtocolInitiation& data)
{
- QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")");
+ QPID_LOG(debug, "Rdma: SENT [" << identifier << "] INIT(" << data << ")");
Rdma::Buffer* buff = aio->getBuffer();
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
@@ -113,7 +113,9 @@
}
void RdmaIOHandler::idle(Rdma::AsynchIO&) {
- if (!aio->writable()) {
+ // TODO: Shouldn't need this test as idle() should only ever be called when
+ // the connection is writable anyway
+ if ( !(aio->writable() && aio->bufferAvailable()) ) {
return;
}
if (isClient && codec == 0) {
@@ -138,7 +140,7 @@
}
void RdmaIOHandler::full(Rdma::AsynchIO&) {
- QPID_LOG(debug, "buffer full [" << identifier << "]");
+ QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]");
}
// The logic here is subtly different from TCP as RDMA is message oriented
@@ -163,7 +165,7 @@
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
decoded = in.getPosition();
- QPID_LOG(debug, "RECV [" << identifier << "] INIT(" <<
protocolInit << ")");
+ QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" <<
protocolInit << ")");
try {
codec = factory->create(protocolInit.getVersion(), *this,
identifier);
if (!codec) {
@@ -231,19 +233,28 @@
bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr& ci, const
Rdma::ConnectionParams& cp,
ConnectionCodec::Factory* f) {
- RdmaIOHandler* async = new RdmaIOHandler(ci, f);
- Rdma::AsynchIO* aio =
- new Rdma::AsynchIO(ci->getQueuePair(),
- cp.maxRecvBufferSize, cp.initialXmitCredit,
Rdma::DEFAULT_WR_ENTRIES,
- boost::bind(&RdmaIOHandler::readbuff, async, _1, _2),
- boost::bind(&RdmaIOHandler::idle, async, _1),
- 0, // boost::bind(&RdmaIOHandler::full, async, _1),
- boost::bind(&RdmaIOHandler::error, async, _1));
- async->init(aio);
-
- // Record aio so we can get it back from a connection
- ci->addContext(async);
- return true;
+ try {
+ RdmaIOHandler* async = new RdmaIOHandler(ci, f);
+ Rdma::AsynchIO* aio =
+ new Rdma::AsynchIO(ci->getQueuePair(),
+ cp.maxRecvBufferSize, cp.initialXmitCredit,
Rdma::DEFAULT_WR_ENTRIES,
+ boost::bind(&RdmaIOHandler::readbuff, async, _1, _2),
+ boost::bind(&RdmaIOHandler::idle, async, _1),
+ 0, // boost::bind(&RdmaIOHandler::full, async, _1),
+ boost::bind(&RdmaIOHandler::error, async, _1));
+ async->init(aio);
+
+ // Record aio so we can get it back from a connection
+ ci->addContext(async);
+ return true;
+ } catch (const Rdma::Exception& e) {
+ QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma excepion): "
<< e.what());
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Rdma: Cannot accept new connection (unknown
exception): " << e.what());
+ }
+
+ // If we get here we caught an exception so reject connection
+ return false;
}
void RdmaIOProtocolFactory::connectionError(Rdma::Connection::intrusive_ptr&,
Rdma::ErrorType) {
@@ -312,7 +323,7 @@
string port = ss.str();
int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res);
if (n<0) {
- throw Exception(QPID_MSG("Cannot resolve " << host << ": " <<
::gai_strerror(n)));
+ throw Exception(QPID_MSG("Rdma: Cannot resolve " << host << ": " <<
::gai_strerror(n)));
}
Rdma::Connector c(
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp?rev=697101&r1=697100&r2=697101&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp Fri Sep 19
07:15:54 2008
@@ -62,11 +62,21 @@
// Prepost some recv buffers before we go any further
for (int i = 0; i<recvBufferCount; ++i) {
+ // Allocate recv buffer
Buffer* b = qp->createBuffer(bufferSize);
buffers.push_front(b);
b->dataCount = b->byteCount;
qp->postRecv(b);
}
+
+ for (int i = 0; i<xmitBufferCount; ++i) {
+ // Allocate xmit buffer
+ Buffer* b = qp->createBuffer(bufferSize);
+ buffers.push_front(b);
+ bufferQueue.push_front(b);
+ b->dataCount = 0;
+ b->dataStart = 0;
+ }
}
AsynchIO::~AsynchIO() {
@@ -378,18 +388,12 @@
Buffer* AsynchIO::getBuffer() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
- if (bufferQueue.empty()) {
- Buffer* b = qp->createBuffer(bufferSize);
- buffers.push_front(b);
- return b;
- } else {
- Buffer* b = bufferQueue.front();
- bufferQueue.pop_front();
- b->dataCount = 0;
- b->dataStart = 0;
- return b;
- }
-
+ assert(!bufferQueue.empty());
+ Buffer* b = bufferQueue.front();
+ bufferQueue.pop_front();
+ b->dataCount = 0;
+ b->dataStart = 0;
+ return b;
}
void AsynchIO::returnBuffer(Buffer* b) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h?rev=697101&r1=697100&r2=697101&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h Fri Sep 19
07:15:54 2008
@@ -65,6 +65,9 @@
ErrorCallback errorCallback;
public:
+ // TODO: Instead of specifying a buffer size specify the amount of
memory the AsynchIO class can use
+ // for buffers both read and write (allocate half to each up front)
and fail if we cannot allocate that much
+ // locked memory
AsynchIO(
QueuePair::intrusive_ptr q,
int size,
@@ -78,6 +81,7 @@
void start(qpid::sys::Poller::shared_ptr poller);
bool writable() const;
+ bool bufferAvailable() const;
void queueWrite(Buffer* buff);
void notifyPendingWrite();
void queueWriteClose();
@@ -109,6 +113,9 @@
return outstandingWrites;
}
+ inline bool AsynchIO::bufferAvailable() const {
+ return !bufferQueue.empty();
+ }
// These are the parameters necessary to start the conversation
// * Each peer HAS to allocate buffers of the size of the maximum receive
from its peer
// * Each peer HAS to know the initial "credit" it has for transmitting to
its peer