I skimmed through your code and noticed a couple of things.
1) You don't have receiver prefetch enabled if you do:
receiver.setCapacity(500); // Enable receiver prefetch
after your call:
receiver = session.createReceiver(addr);
you should see a big improvement
2) Your main loop has a coupled send and receive (fetch), so everything
will go at the speed of the slowest. Qpid provides asynchronous send and
receive but your code isn't seeing the benefit of that (it's largely
behaving like request/response).
I expect part of the perceived performance issue is also due to acking
every message. Try something like:
// Initialise this outside your main loop
unsigned int unackedCount = 0;
// Try replacing your "session.acknowledge( subMessage, false ); " with
this:
unackedCount++;
if (unackedCount > receiver.getCapacity()/2) {
//cout << "auto acknowledged message #" << count << endl;
session.acknowledge();
unackedCount = 0;
}
The above code acknowledges when the unackedCount exceeds half the
receiver capacity.
Also note that qpid-perftest actually uses unreliable messaging, which
means that messages don't need acknowledged at all. You could try an
address like:
const char* addr = "pubsub.topic; {create: always, node: {type: topic},
link: {reliability: unreliable}}";
If my syntax is correct then you should be able to avoid acknowledgement
and get a performance gain (though you risk increased message loss).
Hope this helps a bit.
Frase
[email protected] wrote:
I am new to qpid AMQP broker.
Based on the examples provided with qipd,I have made a simple cpp
program to test the pub/sub throughput on the broker. I am noticing
that the throughput very slow for some reason.
I think this is something wrong in my test code. When I run my test
code, the broker does not seem to be doing much and my app and the
broker are mostly in a stalled/sleep state.
I am not sure what I am doing wrong here. I would expect that when I
run the test, the broker should become busy processing messages as
fast as it can. But it does not seem to be the case here.
qpid broker is running with all defaults ( qpidd -d )
When I run qpid-preftest everything seems to be running smooth and I
get good throughput.
-------------- Code Segment Start ---------------
//pubsub_preftest.cpp
//test sending/receiving binary messages though a QPID broker via
Pub/Sub method.
//
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include <memory.h>
// prototypes
void makeRandomBinaryMessage( unsigned char* buffer, int size );
int timeval_subtract (struct timeval *result, struct timeval *x,
struct timeval *y );
int main(int argc, char** argv) {
// get message size from command line..
int message_size = atoi( argc>1 ? argv[1] : "2048" );
// get number of messages to run..
int message_count = atoi( argc>2 ? argv[2] : "10000" );
std::cout << "Parameters: message_size = " << message_size << ";
message_count = " << message_count << " \n";
// check input parameters
if ( ( message_size < 4 ) || ( message_count < 1 ) ) {
std::cout << "invalid input parameters..\n";
return -1;
}
// time stamps
timeval tv_start, tv_stop, tv_delta;
// create message buffer of size "message_size" bytes
unsigned char *txMessage_buffer = (unsigned char *)malloc(
message_size );
// fill buffer with random stuff
makeRandomBinaryMessage( txMessage_buffer, message_size );
// AMQP Connection parameters
const char* url = "amqp:tcp:127.0.0.1:5672";
const char* addr = "pubsub.topic; {create: always, node: {type:
topic}}";
std::string connectionOptions = "";
// Qpid objects
qpid::messaging::Connection connection(url, connectionOptions);
qpid::messaging::Session session;
qpid::messaging::Receiver receiver;
qpid::messaging::Sender sender;
qpid::messaging::Message pubMessage;
qpid::messaging::Message subMessage;
// loop counter
int lc = 0;
double delta_time;
// Start it up..
try {
// make connection to broker
connection.open();
// create a session on the broker
session = connection.createSession();
// create a sender on this session
sender = session.createSender(addr);
// create a receiver on this session
receiver = session.createReceiver(addr);
// start the do-stuff loop
gettimeofday(&tv_start, NULL);
// print new line
//std::cout << "\n";
for (lc=0; lc < message_count; lc++) {
// binary copy counter as first 4 bytes of buffer
memcpy( (void *)txMessage_buffer, (void *)&lc, 4 );
pubMessage.setContent( (const char *)txMessage_buffer,
message_size );
sender.send( pubMessage, true );
// receive message from broker
subMessage = receiver.fetch();
// print status update every 1%
if ( ( lc % ( message_count / 100 ) ) == 0 ) {
// print
std::cout << "\x1B[1K"; // voodoo to clear line on a
vt100 terminal
std::cout << lc << "/" << message_count << " messages
transfered..\n\x1BM";
}
session.acknowledge( subMessage, false );
}
std::cout << "\n";
gettimeofday(&tv_stop, NULL);
std::cout << "finished sending " << message_count << "
messages of size " << message_size << " bytes each.";
timeval_subtract( &tv_delta, &tv_stop, &tv_start);
delta_time = tv_delta.tv_sec + ( tv_delta.tv_usec / 1e6 );
std::cout << "Opeation completed in " << delta_time << "
seconds\n";
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
connection.close();
return -1;
}
connection.close();
return 0;
}
// simple method to fill a buffer with some random stuff
void makeRandomBinaryMessage( unsigned char* buffer, int size ) {
/* random seed, just using time */
srand ( (unsigned int)time(NULL) );
/* now generate random value to fill buffer */
/* note: this data is very random, but random enough for this. */
int i=0;
for (i; i<size; i++) {
buffer[i] = (char)( rand() % 255 );
}
}
// figure out the difference between to timeval structures
int timeval_subtract (struct timeval *result, struct timeval *x,
struct timeval *y )
{
/* Perform the carry for the later subtraction by updating y. */
if (x->tv_usec < y->tv_usec) {
int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
y->tv_usec -= 1000000 * nsec;
y->tv_sec += nsec;
}
if (x->tv_usec - y->tv_usec > 1000000) {
int nsec = (x->tv_usec - y->tv_usec) / 1000000;
y->tv_usec += 1000000 * nsec;
y->tv_sec -= nsec;
}
/* Compute the time remaining to wait.
tv_usec is certainly positive. */
result->tv_sec = x->tv_sec - y->tv_sec;
result->tv_usec = x->tv_usec - y->tv_usec;
/* Return 1 if result is negative. */
return x->tv_sec < y->tv_sec;
}
-------------- Code Segment End ---------------
--
Peter Fetterer (kb3gtn)
* Failure is not an option, but a standard feature to be avoided.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]