In the given testproject we have a pub/sub pattern and a router/dealer 
pattern. In this test we send in "SEQ 1" 500 multipart messages over the 
pub/sub, then in  "SEQ 2" 500 multipart messages over the router/dealer 
(to do something else, but this is not required) and then in  "SEQ 3" 500 
multipart messages again over the pub/sub again. 

There is also a flag "ZMQ_RAMP_PROBLEM" defined that is needed to simulate 
our problem that there is a ramp at the beginning in  "SEQ 1" till message 
ca. 500 where it breaks down, which can be seen from message ca. 1000 on 
in SEQ 3 of the result log.. If the #define is deactivated the messages 
will be sent continuously and the messages will be sent very fast while 
the activation of this #define includes a sleep after every multipart 
message and brings up the ramp till message 500. For better understanding 
there is also a resultfile.log file given where all the timings of the 
pub/sub connection are logged.

There  is also a flag "SPINNING_RECEIVE" that turns spinning receive 
(NOWAIT option) on.

Our findings so far:

1. In C++ there is the possibility to restrict the process and threads 
running on a single core. We guess, that this removes the 
synchronization/handling time between multiple cores. As a result there is 
no latency ramp (as explained before) - at least for pub/sub-, just the 
first few messages have still a latency but not the first 500 of every 
connection. We still investigate this, but might be the solution.

2. There are still spikes every about 100 ms, which is annoying. Has 
somebody an idea where this comes from?

3. The CPU Frequency if set to 100% makes everything faster but is not the 
reason for the ramp.

4. set "THREAD_PRIORITY_TIME_CRITICAL" also gave us more performance and 
for pub/sub the ramp disappeared. But it is still there for DEALER/ROUTER.


Question: Where does the ramp come from?

Code of "ZMQ_Warmup.cpp". It can be compiled to a console application. It 
just needs zmq.

// ZMQ_Warmup.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include "zmq.h"
#include <process.h>

// For VS Performance analysis
// #include "VSPerf.h"

#define ZMQ_RAMP_PROBLEM
#define SPINNING_RECEIVE

void StartSubscriber(void* subscriberargs);
void StartReplier(void* context);
void* ZmqReceiveData (void *socket);
char* ZmqReceiveString (void *socket);
void* ZmqSpinningReceiveData (void *socket);
char* ZmqSpinningReceiveString (void *socket);
bool ZmqHasMore (void *socket);

typedef struct
{
        void* context;
        const char* subscription;
}subscriberargs;

typedef struct
{
        const char* longstring;
        int length;
}heavystruct;

HANDLE  subscriberdone = CreateEvent( NULL, FALSE, FALSE, NULL );
HANDLE  replierdone = CreateEvent( NULL, FALSE, FALSE, NULL );
HANDLE  messagesreceived = CreateEvent( NULL, FALSE, FALSE, NULL );

FILE    *fid = NULL;
const int       numresults = 1500;
double          timing[numresults];
int         outbufsize[numresults];

LARGE_INTEGER frequency;
LARGE_INTEGER start;
LARGE_INTEGER stop;

void delayMicroSeconds( float microseconds )
{
    __int64 timeEllapsed;
    __int64 timeStart;
    __int64 timeDelta;

    QueryPerformanceFrequency( (LARGE_INTEGER*)(&timeDelta ) );

    double timeToWait = (double)timeDelta * (double)microseconds / 
1000000.0f;

    QueryPerformanceCounter ( (LARGE_INTEGER*)(&timeStart ) );

    timeEllapsed = timeStart;

    while( ( timeEllapsed - timeStart ) < (_int64)timeToWait )
    {
        QueryPerformanceCounter( (LARGE_INTEGER*)(&timeEllapsed ) );

    };
}

short OpenLogFile( void)
{
        errno_t e = fopen_s( &fid, "c:\\resultlog.txt", "w");
        return 0;
}

short CloseLogFile( void)
{
        fclose(fid);
        return 0;
}

short LogMeasTiming( int indicator, double time)
{

        timing[indicator] = time;

        return 0;
}

short LogMeasOutBufSize( int indicator, int BufSize)
{

        outbufsize[indicator] = BufSize;

        return 0;
}

short FlushLogFile()
{
        for (int i = 0; i < numresults/3; i++)
        {
                fprintf(fid, "Loop %5d, %6.2f [us], %5d\n", i, timing[i], 
outbufsize[i]);
        }

    fprintf(fid, "\n");
    fprintf(fid, "********************* END OF SEQ 1 
*************************\n");
    fprintf(fid, "\n");

    for (int i = numresults/3; i < 2 * numresults/3; i++)
        {
                fprintf(fid, "Loop %5d, %6.2f [us], %5d\n", i, timing[i], 
outbufsize[i]);
        }

    fprintf(fid, "\n");
    fprintf(fid, "********************* END OF SEQ 2 
*************************\n");
    fprintf(fid, "\n");

    for (int i = 2*numresults/3; i < 3 * numresults/3; i++)
        {
                fprintf(fid, "Loop %5d, %6.2f [us], %5d\n", i, timing[i], 
outbufsize[i]);
        }

    fprintf(fid, "\n");
    fprintf(fid, "********************* END OF SEQ 3 
*************************\n");
    fprintf(fid, "\n");

        return 0;
}

double LIToSecs( LARGE_INTEGER & L)
{
        return ((double)L.QuadPart /(double)frequency.QuadPart) ;
}

double getElapsedTimeUs()
{
        LARGE_INTEGER time;
        time.QuadPart = stop.QuadPart - start.QuadPart;
        return (LIToSecs( time) * 1000000) ;
}

int main(int argc, char* argv[])
{
    Sleep(1000);

 

    //OpenLogFile for the timing data
        OpenLogFile();

        SetProcessAffinityMask(GetCurrentProcess(), 0x80);
 
    SetThreadAffinityMask(GetCurrentThread(), 0x80);

    SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL);
        //SetThreadPriorityBoost(GetCurrentThread(), false);

        //create context
        void* context = zmq_ctx_new();

 

        //initialize stopwatch
        QueryPerformanceFrequency( &frequency ) ;
        start.QuadPart = 0;
        stop.QuadPart = 0;

        //initaliase local variables
        int send_suc = 0;
    int retries = 0;
    int sendid_suc = 0; 
    int sendaddress_suc = 0;
    int send_data = 0;
        int status = 0;

        //data to send
        const char* subscription = "TestSubscription";
        const char* command = 
"TestCommandWhichIsExtremLongAndDefinitveMoreThanskajflskhdafashkdgfksdjfkjsadfaksjhdflkjashgdlfjhsdöjakjhföasjkhföasuhgöiuwaefnbobfs29Characters"
;
 
        subscriberargs subscriptiondata;
        subscriptiondata.context = context;
        subscriptiondata.subscription = subscription;

        heavystruct data;
        data.longstring = "SuperMegaLongStringThatIsReallyLong";
        data.length = strlen(data.longstring);

        //start subscriber thread
        _beginthread( StartSubscriber, 0, (void*) &subscriptiondata);

        //start replier thread
        _beginthread( StartReplier, 0, context);

        //create publisher
        void* publisher = zmq_socket (context, ZMQ_PUB); //ZMQ_PUB

        int linger = 0, buffsize = 20000;
 
        //set the options for the PUB socket
        int PubState = zmq_setsockopt ( publisher, ZMQ_LINGER, &linger, 
sizeof (linger));
    PubState |= zmq_setsockopt ( publisher, ZMQ_SNDBUF, &buffsize, sizeof 
(buffsize));
    PubState |= zmq_setsockopt ( publisher, ZMQ_RCVBUF, &buffsize, sizeof 
(buffsize));
 

        //wait till replier ready
        WaitForSingleObject(replierdone, INFINITE);

        //create requester
        void* requester = zmq_socket (context, ZMQ_DEALER); //ZMQ_REQ

    //Set options for the requester socket
    int s = zmq_setsockopt ( requester, ZMQ_LINGER, &linger, sizeof 
(linger));

    char identity [255] = "Identity";
    status = zmq_setsockopt ( requester, ZMQ_IDENTITY, identity, strlen( 
identity ) );

    zmq_setsockopt(requester, ZMQ_RCVBUF, &buffsize, sizeof (buffsize));
    zmq_setsockopt(requester, ZMQ_SNDBUF, &buffsize, sizeof (buffsize));

        //wait till subscriber did the connect
        WaitForSingleObject(subscriberdone, INFINITE);
        status |= zmq_bind (publisher,  "inproc://ZMQ_TEST_PUB_SUB" ); 
 
    status |= zmq_connect(requester, "inproc://ZMQ_TEST_ROUTER_DEALER");

        //If error occured exit program
        if(PubState)
                exit(-1);

    // ******************************* SEQ 1 (ASYNC) 
*******************************
        //send 500 multi-part messages to the subscriber and log timing 
data

 


        for (int i = 0; i < numresults/3; i++)
        {
 
                //start the timer 
                QueryPerformanceCounter(&start);

 
 
 

                //send multipart message
                sendaddress_suc = zmq_send (publisher, subscription, 
strlen (subscription), ZMQ_SNDMORE);
                send_suc = zmq_send (publisher, command, strlen (command), 
ZMQ_SNDMORE);
                send_data = zmq_send (publisher, (void*) &data, sizeof 
(data), 0);

 
 
 


                //stop the timer
                QueryPerformanceCounter(&stop);

 

#ifdef ZMQ_RAMP_PROBLEM
                //equals postsending other operations in the programm
                delayMicroSeconds(100);
#endif

                //Log timing
                LogMeasTiming(i, getElapsedTimeUs());

 
        }

 
    Sleep(1000);
 
    // ******************************* SEQ 2 (SYNC) 
*******************************
        //send 500 multi-part messages to the replier/router
        for (int i = numresults/3; i < 2*numresults/3; i++)
        {
        // For VS Performance analysis
        //MarkProfile( i );

        //send multipart message
                sendaddress_suc = zmq_send (requester, "", 0, ZMQ_SNDMORE
);
                send_suc = zmq_send (requester, command, strlen (command), 
ZMQ_SNDMORE);

        if(send_suc==-1)
        {
            int isdfg=0;
        }

        //start the timer 
                QueryPerformanceCounter(&start);

 
                send_data = zmq_send (requester, (void*) &data, sizeof 
(data), 0);
 
        //stop the timer
                QueryPerformanceCounter(&stop);

        // For VS Performance analysis
        //MarkProfile( i );

        #ifdef ZMQ_RAMP_PROBLEM
                    //equals postsending other operations in the programm
                    delayMicroSeconds(100);
        #endif

        //Log timing
                LogMeasTiming(i, getElapsedTimeUs());

        // For VS Performance analysis
        //if(getElapsedTimeUs() > 700)
        //    CommentMarkProfile( i, L"HERE" );

                //free the envelope first
                free(ZmqReceiveData( requester )); //Dealer/REP 
combination envelope
        //ZmqReceiveData( requester );
 
                // get the rest of the message and free memory
        free(ZmqReceiveData( requester)); 
        //ZmqReceiveData( requester);

 
        }
 
    //Sleep(1000);

    // ******************************* SEQ 3 (ASYNC) 
*******************************
        //send 500 multi-part messages to the subscriber and log timing 
data
        for (int i = 2* numresults/3; i < 3* numresults/3; i++)
        {
                //start the timer 
                QueryPerformanceCounter(&start);

                //send multipart message
                sendaddress_suc = zmq_send (publisher, subscription, 
strlen (subscription), ZMQ_SNDMORE);
                send_suc = zmq_send (publisher, command, strlen (command), 
ZMQ_SNDMORE);
                send_data = zmq_send (publisher, (void*) &data, sizeof 
(data), 0);

                //stop the timer
                QueryPerformanceCounter(&stop);

#ifdef ZMQ_RAMP_PROBLEM
                //equals postsending other operations in the programm
                delayMicroSeconds(100);
#endif

                //Log timing
                LogMeasTiming(i, getElapsedTimeUs());
        }

        //send final multipart message to subscriber
        sendaddress_suc = zmq_send (publisher, subscription, strlen 
(subscription), ZMQ_SNDMORE);
        send_suc = zmq_send (publisher, "Ende", 4, ZMQ_SNDMORE);
        send_data = zmq_send (publisher, (void*) &data, sizeof (data), 0);

        //wait till all data has been received by subscriber
        WaitForSingleObject(messagesreceived, INFINITE);

        //close pub and requester/dealer
        status |= zmq_close (publisher); 
        publisher = NULL;
        status |= zmq_close (requester);
        requester = NULL;

        //destroy context
        zmq_ctx_term(context);

        //flush log data and close file
        FlushLogFile();
        CloseLogFile();

        return 0;
}

void StartSubscriber(void* subscriberarguments)
{
    SetThreadAffinityMask(GetCurrentThread(), 0x80);
 
    SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL);
        //SetThreadPriorityBoost(GetCurrentThread(), false);

        subscriberargs *subscriptiondata = (subscriberargs *)
subscriberarguments;

        void* subscriber = zmq_socket (subscriptiondata->context, ZMQ_SUB
);

        int linger = 0, buffsize = 20000;
        int HighWaterMark = 1;               //Set high water mark to 0 
(unlimited) for inbound messages (default 1000)

        //Set Linger
        int zmqstatus = zmq_setsockopt ( subscriber, ZMQ_LINGER, &linger, 
sizeof (linger));
 


        //set the subscription 
        zmqstatus |= zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, 
subscriptiondata->subscription, strlen(subscriptiondata->subscription));

        zmqstatus |= zmq_connect (subscriber, "inproc://ZMQ_TEST_PUB_SUB"
);
 
        SetEvent(subscriberdone);

        bool endsubsciber = false;

        while (!endsubsciber)
        {

                void* data = 0;
                char *receivedsubscription = ZmqReceiveString(subscriber);

                //if nothing received or false received
                if(receivedsubscription == NULL || 
strcmp(receivedsubscription, subscriptiondata->subscription) != 0)
                {
                        //free Ad
                        free(receivedsubscription);
                        break;
                }

                //then the command
                char *command = ZmqReceiveString(subscriber);

                //check if it is the last message
                if(strcmp(command, "Ende") == 0)
                        endsubsciber = true;

                //Check if one more message is coming
                if(ZmqHasMore(subscriber)) 
                {
                        //receive the last frame
                        data = ZmqReceiveData(subscriber);
                }

                //free memory
                free(receivedsubscription);
                free(command);
                free(data);
        }

        //close the sub socket
        zmqstatus |= zmq_close (subscriber);
        subscriber = NULL;

        //tell main thread that subscriber is closed
        SetEvent(messagesreceived);
 
        return;
}

void StartReplier(void* replierarguments)
{
    SetThreadAffinityMask(GetCurrentThread(), 0x80);

    SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL);
        //SetThreadPriorityBoost(GetCurrentThread(), false);

        //create replier/router socket
        void* replier = zmq_socket (replierarguments, ZMQ_ROUTER);

        int linger = 0, buffsize = 20000;

        //set socket options
        int zmqstatus = zmq_setsockopt ( replier, ZMQ_LINGER, &linger, 
sizeof (linger));
        zmqstatus |= zmq_setsockopt ( replier, ZMQ_RCVBUF, &buffsize, 
sizeof (buffsize));
        zmqstatus |= zmq_setsockopt ( replier, ZMQ_SNDBUF, &buffsize, 
sizeof (buffsize));


        //bind replier socket
        zmqstatus |= zmq_bind (replier, "inproc://ZMQ_TEST_ROUTER_DEALER"
);
 
        //tell main thread replier is set
        SetEvent(replierdone);

        int counter = 0;

        //run for 500 messages
        while (counter < numresults/3)
        {
                void* Data = 0;

                //first get identity
        #ifdef SPINNING_RECEIVE
                    char *identity = ZmqSpinningReceiveString(replier);
        #else
            char *identity = ZmqReceiveString(replier);
        #endif

                //if nothing received or false received
                if(identity == NULL)
                {
                        free(identity);
                        break;
                }

                //free the envelope
        #ifdef SPINNING_RECEIVE
                    free(ZmqSpinningReceiveString(replier));
        #else
            free(ZmqReceiveString(replier));
        #endif
 

                //then get the command
        #ifdef SPINNING_RECEIVE
                    char *command = ZmqSpinningReceiveString(replier);
        #else
            char *command = ZmqReceiveString(replier);
        #endif
 

                //check is there is more
                if(ZmqHasMore(replier)) 
                {
                        //receive final message
            #ifdef SPINNING_RECEIVE
                        Data = ZmqSpinningReceiveData(replier);
            #else
                Data = ZmqReceiveData(replier);
            #endif
 
                }

        //Sleep(1);

                //send the reply
                zmq_send(replier, "Identity", strlen("Identity"), 
ZMQ_SNDMORE);
                zmq_send(replier, "", 0, ZMQ_SNDMORE);
                zmq_send(replier, "ReceiveComplete", 15, 0);

                //Free memory
                free(identity);
                free(command);
                free(Data); 

                //counts the messages received
                counter++;
        }

        //close replier
        zmqstatus |= zmq_close (replier);
        replier = NULL;
 
        return;
}


void* ZmqSpinningReceiveData (void *socket) {
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);
        //assert (rc == 0);
 
    rc = -1;
    while(  rc == -1 )
    {
        rc = zmq_msg_recv (&msg, socket, ZMQ_DONTWAIT);
        //Sleep(0);
    }

        if(rc==-1)
        {
                zmq_msg_close (&msg);
                return NULL;
        }

        void *rcvdata = zmq_msg_data(&msg);

        void* buffer = malloc( rc );
        memcpy_s( buffer, rc, rcvdata, rc);

        zmq_msg_close (&msg);

        return buffer;
}


void* ZmqReceiveData (void *socket) {
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);
        //assert (rc == 0);
        /* Block until a message is available to be received from socket 
*/
        rc = zmq_msg_recv (&msg, socket, 0);
        if(rc==-1)
        {
                zmq_msg_close (&msg);
                return NULL;
        }

        void *rcvdata = zmq_msg_data(&msg);

        void* buffer = malloc( rc );
        memcpy_s( buffer, rc, rcvdata, rc);

        zmq_msg_close (&msg);

        return buffer;
}


char* ZmqSpinningReceiveString (void *socket) {
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);

        /* Block until a message is available to be received from socket 
*/
    rc = -1;
    while(  rc == -1 )
    {
        rc = zmq_msg_recv (&msg, socket, ZMQ_DONTWAIT);
        //Sleep(0);
    }


        //assert (rc != -1);

        if(rc==-1)
        {
                zmq_msg_close (&msg); 
                //Wait some time to have a look at the message before .exe 
gets closed
                return NULL;
        }

        void* rcvdata = zmq_msg_data(&msg);

 
        char* buffer = (char*) malloc( rc + 1 );
        memcpy_s(buffer, rc, rcvdata, rc);
        buffer[rc] = '\0';
 
        zmq_msg_close (&msg);

 
        return buffer;
}


char* ZmqReceiveString (void *socket) {
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);

        /* Block until a message is available to be received from socket 
*/
        rc = zmq_msg_recv (&msg, socket, 0);
        //assert (rc != -1);

        if(rc==-1)
        {
                zmq_msg_close (&msg); 
                //Wait some time to have a look at the message before .exe 
gets closed
                return NULL;
        }

        void* rcvdata = zmq_msg_data(&msg);

 
        char* buffer = (char*) malloc( rc + 1 );
        memcpy_s(buffer, rc, rcvdata, rc);
        buffer[rc] = '\0';
 
        zmq_msg_close (&msg);

 
        return buffer;
}

bool ZmqHasMore (void *socket) 
{
        int64_t more;           //  Multipart detection
    more = 0;
    size_t more_size = sizeof (more);
    zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);

        if (!more)
                return false;
        else
                return true;
}








Von:    Pieter Hintjens <[email protected]>
An:     ZeroMQ development list <[email protected]>, 
Kopie:  [email protected]
Datum:  30.09.2015 14:50
Betreff:        Re: [zeromq-dev] Bad ZMQ latency on first 500 messages per 
connection
Gesendet von:   [email protected]



Before tweaking the send calls, I'd advise making a *minimal* test
case that shows the effect, in C. We can then see exactly where the
latency spike is coming from. No point in trying to remove an effect
we don't understand (in fact, counter productive, we need to
understand the spike first.)

On Wed, Sep 30, 2015 at 8:31 AM,  <[email protected]> 
wrote:
> We also agree that there must be a buffer somewhere in TCP stack or
> OS/memory(page allocation)/scheduling. It becomes clear, like Ben 
mentioned,
> when the sender sends a lot more messages than the receiver receives.
>
> A time delay for the TCP connection could be a reason for that, as Peter
> asked. But in our scenario we do a zmq_connect/zmq_bind at the 
beginning,
> which is not part of the measured results.
> We use inproc communication with ZMQ 4.0.4 in a C++ project with 
"zmq_send".
>
> All message data we have is memory aligned structures (memory blocks). 
Does
> this mean we could also utilize "zmq_send_const"? We could then 
eliminate at
> least one copy operation if I am right.
>
>
>
>> Date: Tue, 29 Sep 2015 09:44:19 +0200
>> From: Pieter Hintjens <[email protected]>
>> Subject: Re: [zeromq-dev] Bad ZMQ latency on first 500 messages per
>>                  connection
>> To: ZeroMQ development list <[email protected]>
>> Message-ID:
>>
>> <CADL5_sgpYKi=k2doeObsrAkmGbfzv6SMcfe=q2qwp0_zcoa...@mail.gmail.com>
>> Content-Type: text/plain; charset=UTF-8
>
>>
>> Have you excluded the cost of establishing the TCP connection itself
>> (something around 10 msec, depending on the setup)?
>>
>> On Tue, Sep 29, 2015 at 8:49 AM, Ben Kloosterman <[email protected]>
>> wrote:
>>> "As can be seen, sender and receiver throughput's differ at the 
beginning
>>> of
>>> the test. This seems to suggest that there is some kind of buffering
>>> involved on the lower layers of the stack"
>>>
>>> Nagle is turned off  but it looks like it keeps increasing some sort 
of
>>> buffer in the tcp stack.. could also be OS / memory ( page allocation) 
 /
>>> Scheduling priority.
>>>
>>> Note in those 500 messages the sender was sending a lot more messages 
,
>>> than
>>> the receiver was receiving so a buffer would be growing somewhere.
>>>
>>> Ben
>>>
>>> On Tue, Sep 29, 2015 at 4:35 PM, <[email protected]>
>>> wrote:
>>>>
>>>> Hi all,
>>>>
>>>> when sending many messages at once there is a known latency issue: 
The
>>>> first 500 messages are sent with a high latency shown in the graph in
>>>> the
>>>> whitepaper
> http://zeromq.org/whitepapers:measuring-performance before it
>
>>>> gets to very high speed messaging.
>>>>
>>>> The author summarizes:
>>>> * For latency: ... the latency for first 500 messages is quite poor 
(up
>>>> to
>>>> 1,500 microseconds), however, afterwards it stabilises at 
approximately
>>>> 150
>>>> microseconds, with occasional peaks up to 400 microseconds.
>>>> * For throughput: This seems to suggest that there is some kind of
>>>> buffering involved on the lower layers of the stack.
>>>>
>>>> In our scenario, we use a Dealer/Router connection, which is in fact
>>>> like
>>>> REQ/REP.
>>>>
>>>> Question:
>>>> Why have the first 500 messages this latency issue? What is the 
reason
>>>> for
>>>> this and how could we solve this issue, because we require a high
>>>> performance application even for the first messages.
>>>>
>>>
>>> _______________________________________________
>>> zeromq-dev mailing list
>>> [email protected]
>>>
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to