Based on some suggests from Fraser, I was able to make a pub/sub performance test tool that seems to work at the max rate of the broker.
Here is my code, so that other may use it as a reference.

----- Code Start -----
//pubsub_preftest2.cpp
// Test sending binary messages though a QPID broker via Pub/Sub method. // This program provides a system for characterizing the performance of the AMQP broker
//
// Command line arguments:
// ./pubsub_preftest2 <message size> <message count>
//
//Peter Fetterer <[email protected]>
//
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>

#include <iostream>

#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include <memory.h>
#include <pthread.h>
#include <unistd.h>

// prototypes
void makeRandomBinaryMessage( unsigned char* buffer, int size );
int timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y );

// Arguments for worker threads
struct threadArgs_t {
    // command line params
    int message_size;
    int message_count;
    // AMQP Connection parameters
    char* url;
    char* addr;
    // name of the worker thread
    std::string threadName;
    int threadResult;
    int start;
};

// topic producer thread worker
void *publisher( void *args ) {
    // pull args
    threadArgs_t *myArgs = (threadArgs_t *)args;

    // Qpid objects
    qpid::messaging::Connection connection( myArgs->url, "");
    qpid::messaging::Session session;
    qpid::messaging::Sender sender;
    qpid::messaging::Message pubMessage;

    // time stamps
    timeval tv_start, tv_stop, tv_delta;

    // create message buffer of size "message_size" bytes
unsigned char *txMessage_buffer = (unsigned char *)malloc( myArgs->message_size );

    // fill buffer with random stuff
    makeRandomBinaryMessage( txMessage_buffer, myArgs->message_size );

    // Start it up..
    try {
        // make connection to broker
        connection.open();
        // create a session on the broker
        session = connection.createSession();
        // create a sender on this session
        sender = session.createSender(myArgs->addr);
sender.setCapacity(1048576/(myArgs->message_size*2)); // no more than a 0.5 Mb worth of outsanding messages

        // start the do-stuff loop
        gettimeofday(&tv_start, NULL);

        int lc = 0;
        double delta_time;

        // do a loop waiting for start to be triggered
        while ( myArgs->start == 0 ) {
            sleep(1); // sleep
        }

std::cout << "Sender \"" << myArgs->threadName << "\" Starting..\n";
        for (lc=0; lc < myArgs->message_count; lc++) {
            // binary copy counter as first 4 bytes of buffer
            memcpy( (void *)txMessage_buffer, (void *)&lc, 4 );
pubMessage.setContent( (const char *)txMessage_buffer, myArgs->message_size );
            // blocks if sender capacity limit is reached.
            // -- which means the receiver(s) are not keeping up..
            sender.send( pubMessage, false );
        }
        gettimeofday(&tv_stop, NULL);
std::cout << "Sender \"" << myArgs->threadName << "\" finished; sent " << myArgs->message_count << " messages of size " << myArgs->message_size << " bytes.\n";
        timeval_subtract( &tv_delta,  &tv_stop, &tv_start);
        delta_time = tv_delta.tv_sec + ( tv_delta.tv_usec / 1e6 );
std::cout << "Sender \"" << myArgs->threadName << "\" Opeation completed in " << delta_time << " seconds\n"; std::cout << "Sender \"" << myArgs->threadName << "\" datarate " << (double)(myArgs->message_count*myArgs->message_size)/delta_time << " Bytes/second -> " << (double)(myArgs->message_count*myArgs->message_size)/delta_time * 8 << " bits/second \n";
    } catch(const std::exception& error) {
std::cout << "Sender \"" << myArgs->threadName << "\" exception: " << error.what() << std::endl;
        connection.close();
        myArgs->threadResult = -1;
        return &myArgs->threadResult;
    }
    connection.close();
    myArgs->threadResult = 0;
    return &myArgs->threadResult;
}

// topic consumer thread worker
void *consumer( void *args ) {
    // pull args
    threadArgs_t *myArgs = (threadArgs_t *)args;

    // Qpid objects
    qpid::messaging::Connection connection(myArgs->url, "");
    qpid::messaging::Session session;
    qpid::messaging::Receiver receiver;
    qpid::messaging::Message subMessage;

    // time stamps
    timeval tv_start, tv_stop, tv_delta;

    int rxCount = 0;
    int rxCount_rx = 0;
    double delta_time;

    // Initialise this outside your main loop
    unsigned int unackedCount = 0;

    // Start it up..
    try {
        // make connection to broker
        connection.open();
        // create a session on the broker
        session = connection.createSession();
        // create a receiver on this session
        receiver = session.createReceiver(myArgs->addr);
        receiver.setCapacity(1048576/(myArgs->message_size*2));

        // start the do-stuff loop
        gettimeofday(&tv_start, NULL);

        // print new line
        // std::cout << "\n";
        int running = 1;

std::cout << "Receiver \"" << myArgs->threadName << "\" has started.. \n";

        while( running ) {
            // this receiver blocks for 1 sec before giving up..
if ( receiver.fetch( subMessage, qpid::messaging::Duration::SECOND * 1 ) == false ) {
                // no message received, exit
std::cout << "Receiver \"" << myArgs->threadName << "\" says receiver timed out.. exit.. \n";
                running = 0;
            }

            if ( ( rxCount % ( myArgs->message_count / 10 ) ) == 0 ) {
                // print
std::cout << "Receiver \"" << myArgs->threadName << "\" says " << rxCount << " messages transfered..\n";
            }

// verify count, copy first 4 bytes of tx message into an int
            if (subMessage.getContentSize() > 4 ) {
memcpy( (void *)&rxCount_rx, (void *)(char *)subMessage.getContentPtr(), 4 );
                if ( rxCount_rx == (myArgs->message_count)-1 ) {
                    // last expected message received
                    running = 0;
                }
            }

            // is rxCount not what we expect..
            if ( rxCount != rxCount_rx ) {
std::cout << "Receiver \"" << myArgs->threadName << "\" Error recieved count:" << rxCount_rx << " expected " <<rxCount << std::endl;
                rxCount = rxCount_rx;
            }

            // increment rxCount for next expected count
            rxCount++;

// Try replacing your "session.acknowledge( subMessage, false ); " with this:
            // bulk acknowledge speeds stuff up..
            unackedCount++;
            if (unackedCount > receiver.getCapacity()/2) {
               session.acknowledge();
               unackedCount = 0;
            }
        }
        std::cout << "\n";
std::cout << "Receiver \"" << myArgs->threadName << "\" RxCount = " << rxCount << std::endl;
        gettimeofday(&tv_stop, NULL);
std::cout << "Receiver \"" << myArgs->threadName << "\" finished Receiveing " << myArgs->message_count << " messages of size " << myArgs->message_size << " bytes each.\n";
        timeval_subtract( &tv_delta,  &tv_stop, &tv_start);
        delta_time = tv_delta.tv_sec + ( tv_delta.tv_usec / 1e6 );
std::cout << "Receiver \"" << myArgs->threadName << "\" Opeation completed in " << delta_time << " seconds\n"; std::cout << "Receiver \"" << myArgs->threadName << "\" datarate " << ((double)(myArgs->message_count*myArgs->message_size))/delta_time << " Bytes/second -> " << ((double)((myArgs->message_count*myArgs->message_size)/delta_time)) * 8 << " bits/second \n";

    } catch(const std::exception& error) {
std::cout << "Receiver \"" << myArgs->threadName << "\" exception: " << error.what() << std::endl;
        connection.close();
        myArgs->threadResult = -1;
        return &myArgs->threadResult;
    }
    connection.close();
    myArgs->threadResult = 0;
    return &myArgs->threadResult;
}

// <program name> <message_size> <message count>
int main(int argc, char** argv) {
    // get message size from command line..
    int message_size  = atoi( argc>1 ? argv[1] : "2048" );
    // get number of messages to run..
    int message_count = atoi( argc>2 ? argv[2] : "100000" );

std::cout << "Parameters: message_size = " << message_size << "; message_count = " << message_count << " \n";

    // check input parameters
    if ( ( message_size < 4 ) || ( message_count < 1 ) ) {
        std::cout << "invalid input parameters..\n";
        return -1;
    }

    // AMQP Connection parameters
    const char* url = "amqp:tcp:127.0.0.1:5672";
const char* addr = "pubsub.topic; {create: always, node: {durable: false, type: topic}, link: {reliability: unreliable}}";

    // create threadArgs objects for each thread
    threadArgs_t sender1_args;
    threadArgs_t receiver1_args;
    threadArgs_t receiver2_args;

    // pthread objects
    pthread_t sender1, receiver1, receiver2;

    // return value storage for threads
    int threadReturn1, threadReturn2, threadReturn3;

    sender1_args.message_size = message_size;
    sender1_args.message_count = message_count;
    sender1_args.url = (char *)url;
    sender1_args.addr = (char *)addr;
    sender1_args.threadName = "sender";
    sender1_args.start = 0; // don't start right away

    receiver1_args.message_size = message_size;
    receiver1_args.message_count = message_count;
    receiver1_args.url = (char *)url;
    receiver1_args.addr = (char *)addr;
    receiver1_args.threadName = "receiver1";

    receiver2_args.message_size = message_size;
    receiver2_args.message_count = message_count;
    receiver2_args.url = (char *)url;
    receiver2_args.addr = (char *)addr;
    receiver2_args.threadName = "receiver2";

    // create threads -- 1 publisher and 2 comsumers
threadReturn1 = pthread_create( &sender1, NULL, publisher, (void*) &sender1_args ); threadReturn2 = pthread_create( &receiver1, NULL, consumer, (void*) &receiver1_args ); threadReturn3 = pthread_create( &receiver2, NULL, consumer, (void*) &receiver2_args );

    // everyone up a and waiting.. start sender
    sender1_args.start = 1;

/* Wait till threads are complete before main continues. Unless we */ /* wait we run the risk of executing an exit which will terminate */ /* the process and all threads before the threads have completed. */
    pthread_join( sender1, NULL);
    pthread_join( receiver1, NULL);
    pthread_join( receiver2, NULL);

    std::cout << "Exit Status:\n";
    std::cout << "sender1 returned: " << threadReturn1 << std::endl;
    std::cout << "receiver1 returned: " << threadReturn2 << std::endl;
    std::cout << "receiver2 returned: " << threadReturn3 << std::endl;
    std::cout << "Main Complete.. \n";
    return 0;
}


// simple method to fill a buffer with some random stuff
void makeRandomBinaryMessage( unsigned char* buffer, int size ) {
    /* random seed, just using time */
    srand ( (unsigned int)time(NULL) );

    /* now generate random value to fill buffer */
    /* note: this data is very random, but random enough for this. */
    int i=0;
    for (i; i<size; i++) {
        buffer[i] = (char)( rand() % 255 );
    }
}

// figure out the difference between to timeval structures
int timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y )
{
    /* Perform the carry for the later subtraction by updating y. */
    if (x->tv_usec < y->tv_usec) {
        int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
        y->tv_usec -= 1000000 * nsec;
        y->tv_sec += nsec;
    }
    if (x->tv_usec - y->tv_usec > 1000000) {
        int nsec = (x->tv_usec - y->tv_usec) / 1000000;
        y->tv_usec += 1000000 * nsec;
        y->tv_sec -= nsec;
    }

    /* Compute the time remaining to wait.
       tv_usec is certainly positive. */
    result->tv_sec = x->tv_sec - y->tv_sec;
    result->tv_usec = x->tv_usec - y->tv_usec;

    /* Return 1 if result is negative. */
    return x->tv_sec < y->tv_sec;
}
----- Code Stop -----

1 liner build instructions, assuming you have qpid in your include path and such..
g++ pubsub_perftest2.cpp -lqpidmessaging -lpthread -o pubsub_perftest2

Output I got on a Intel(R) Core(TM) i7 CPU 960 @ 3.20GHz on a X58 triple channel memory configuration system:
Sender "sender" finished; sent 1000000 messages of size 2048 bytes.
Sender "sender" Opeation completed in 15.7477 seconds
Sender "sender" datarate 1.30051e+08 Bytes/second -> 1.04041e+09 bits/second

Receiver "receiver1" RxCount = 1000000
Receiver "receiver1" finished Receiveing 1000000 messages of size 2048 bytes each.
Receiver "receiver1" Opeation completed in 15.8164 seconds
Receiver "receiver1" datarate 1.29486e+08 Bytes/second -> 1.03589e+09 bits/second

Receiver "receiver2" RxCount = 1000000
Receiver "receiver2" finished Receiveing 1000000 messages of size 2048 bytes each.
Receiver "receiver2" Opeation completed in 15.826 seconds
Receiver "receiver2" datarate 1.29408e+08 Bytes/second -> 1.03526e+09 bits/second

--
Peter Fetterer <[email protected]>

On Sun, 13 Nov 2011 11:16:03 +0000, Fraser Adams wrote:
I skimmed through your code and noticed a couple of things.
1) You don't have receiver prefetch enabled  if you do:
receiver.setCapacity(500); // Enable receiver prefetch
after your call:
receiver = session.createReceiver(addr);
you should see a big improvement
2) Your main loop has a coupled send and receive (fetch), so
everything will go at the speed of the slowest. Qpid provides
asynchronous send and receive but your code isn't seeing the benefit
of that (it's largely behaving like request/response).
I expect part of the perceived performance issue is also due to
acking every message. Try something like:

// Initialise this outside your main loop
unsigned int unackedCount = 0;


// Try replacing your "session.acknowledge( subMessage, false ); " with this:
unackedCount++;
if (unackedCount > receiver.getCapacity()/2) {
   //cout << "auto acknowledged message #" << count << endl;
   session.acknowledge();
   unackedCount = 0;
}


The above code acknowledges when the unackedCount exceeds half the
receiver capacity.


Also note that qpid-perftest actually uses unreliable messaging,
which means that messages don't need acknowledged at all. You could
try an address like:

const char* addr = "pubsub.topic; {create: always, node: {type:
topic}, link: {reliability: unreliable}}";


If my syntax is correct then you should be able to avoid
acknowledgement and get a performance gain (though you risk increased
message loss).

Hope this helps a bit.
Frase




[email protected] wrote:
I am new to qpid AMQP broker.

Based on the examples provided with qipd,I have made a simple cpp program to test the pub/sub throughput on the broker. I am noticing that the throughput very slow for some reason.

I think this is something wrong in my test code. When I run my test code, the broker does not seem to be doing much and my app and the broker are mostly in a stalled/sleep state.

I am not sure what I am doing wrong here. I would expect that when I run the test, the broker should become busy processing messages as fast as it can. But it does not seem to be the case here.

qpid broker is running with all defaults ( qpidd -d )
When I run qpid-preftest everything seems to be running smooth and I get good throughput.

-------------- Code Segment Start ---------------
//pubsub_preftest.cpp
//test sending/receiving binary messages though a QPID broker via Pub/Sub method.
//
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>

#include <iostream>

#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include <memory.h>

// prototypes
void makeRandomBinaryMessage( unsigned char* buffer, int size );
int timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y );

int main(int argc, char** argv) {

    // get message size from command line..
    int message_size  = atoi( argc>1 ? argv[1] : "2048" );
    // get number of messages to run..
    int message_count = atoi( argc>2 ? argv[2] : "10000" );

std::cout << "Parameters: message_size = " << message_size << "; message_count = " << message_count << " \n";

    // check input parameters
    if ( ( message_size < 4 ) || ( message_count < 1 ) ) {
        std::cout << "invalid input parameters..\n";
        return -1;
    }

    // time stamps
    timeval tv_start, tv_stop, tv_delta;

    // create message buffer of size "message_size" bytes
unsigned char *txMessage_buffer = (unsigned char *)malloc( message_size );

    // fill buffer with random stuff
    makeRandomBinaryMessage( txMessage_buffer, message_size );

    // AMQP Connection parameters
    const char* url = "amqp:tcp:127.0.0.1:5672";
const char* addr = "pubsub.topic; {create: always, node: {type: topic}}";
    std::string connectionOptions = "";

    // Qpid objects
    qpid::messaging::Connection connection(url, connectionOptions);
    qpid::messaging::Session session;
    qpid::messaging::Receiver receiver;
    qpid::messaging::Sender sender;
    qpid::messaging::Message pubMessage;
    qpid::messaging::Message subMessage;

    // loop counter
    int lc = 0;
    double delta_time;

    // Start it up..
    try {
        // make connection to broker
        connection.open();
        // create a session on the broker
        session = connection.createSession();
        // create a sender on this session
        sender = session.createSender(addr);
        // create a receiver on this session
        receiver = session.createReceiver(addr);

        // start the do-stuff loop
        gettimeofday(&tv_start, NULL);

        // print new line
        //std::cout << "\n";

        for (lc=0; lc < message_count; lc++) {
            // binary copy counter as first 4 bytes of buffer
            memcpy( (void *)txMessage_buffer, (void *)&lc, 4 );
pubMessage.setContent( (const char *)txMessage_buffer, message_size );
            sender.send( pubMessage, true );
            // receive message from broker
            subMessage = receiver.fetch();
            // print status update every 1%
            if ( ( lc % ( message_count / 100 ) ) == 0 ) {
                // print
std::cout << "\x1B[1K"; // voodoo to clear line on a vt100 terminal std::cout << lc << "/" << message_count << " messages transfered..\n\x1BM";

            }
            session.acknowledge( subMessage, false );
        }
        std::cout << "\n";
        gettimeofday(&tv_stop, NULL);
std::cout << "finished sending " << message_count << " messages of size " << message_size << " bytes each.";
        timeval_subtract( &tv_delta, &tv_stop, &tv_start);
        delta_time = tv_delta.tv_sec + ( tv_delta.tv_usec / 1e6 );
std::cout << "Opeation completed in " << delta_time << " seconds\n";
    } catch(const std::exception& error) {
        std::cout << error.what() << std::endl;
        connection.close();
        return -1;
    }

    connection.close();
    return 0;
}

// simple method to fill a buffer with some random stuff
void makeRandomBinaryMessage( unsigned char* buffer, int size ) {
    /* random seed, just using time */
    srand ( (unsigned int)time(NULL) );

    /* now generate random value to fill buffer */
/* note: this data is very random, but random enough for this. */
    int i=0;
    for (i; i<size; i++) {
        buffer[i] = (char)( rand() % 255 );
    }
}

// figure out the difference between to timeval structures
int timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y )
{
  /* Perform the carry for the later subtraction by updating y. */
  if (x->tv_usec < y->tv_usec) {
    int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
    y->tv_usec -= 1000000 * nsec;
    y->tv_sec += nsec;
  }
  if (x->tv_usec - y->tv_usec > 1000000) {
    int nsec = (x->tv_usec - y->tv_usec) / 1000000;
    y->tv_usec += 1000000 * nsec;
    y->tv_sec -= nsec;
  }

  /* Compute the time remaining to wait.
     tv_usec is certainly positive. */
  result->tv_sec = x->tv_sec - y->tv_sec;
  result->tv_usec = x->tv_usec - y->tv_usec;

  /* Return 1 if result is negative. */
  return x->tv_sec < y->tv_sec;
}

-------------- Code Segment End ---------------

-- Peter Fetterer (kb3gtn)
* Failure is not an option, but a standard feature to be avoided.


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]




---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to