>It could be Linux kernel behavior. You can try with a real time kernel.
We are on a Windows 7 machine, core i7-3770 CPU @ 4.40GHz, 16 GB RAM, 64 
Bit. So you think Windows Kernel behavior. Unfortunately I don't have a 
real time kernel, but I will follow this hint.

>Before tweaking the send calls, I'd advise making a *minimal* test
>case that shows the effect, in C. We can then see exactly where the
>latency spike is coming from. No point in trying to remove an effect
>we don't understand (in fact, counter productive, we need to
>understand the spike first.)

We have sent you the C source code in the last message as asked. Could you 
reproduce the issues?

>It would be really interesting and useful to other people if you could 
write your experiences up in a blog or article somewhere...
I have a problem, and it should also be a problem for others if the time 
of the first about 500 meassages for inproc communication is too large. I 
post to the list because this is the right place for such questions and I 
think if I can't find a solution or at least the reason for this behavior, 
it might be a (at least) medium term property of ZMQ.
If so, (please correct me) then for me it is a point where I start 
thinking about a ZMQ replacement in our software because we can't live 
with it-and also others should know this behavior.

New findings:
* We have set the compiler to compile to 32 bit. A change to 64 bit gave 
us more performance and the latencies are reduced which is good news. But 
the main issue and behavior of spikes and the latency ramp is still there.



Summary so far:
* Set the compiler to 64 bit reduced the latency times, but the main 
issues are still there: spikes and the latency ramp.
* Inproc communication for PUB/SUB and ROUTER/DEALER creates a latency 
ramp for the first ca. 500 messages.
* The problem for dealer/router is that the time for sending inproc 172 
bytes is up to 800us=0.8ms (!!!) at the end of the ramp and 80-130us 
before/after the ramp with all optimizations done (Thread prio =Time 
Critical and SW running on single core).
* The latency ramp appears if there is a delay between the send commands, 
either by a delayMicroSeconds(100) (PUB/SUB) or by calls to 
free(ZmqReceiveData( requester )) (ROUTER/DEALER).
* For PUB/SUB the communication is OK with all optimizations turned on 
(Thread prio =Time Critical and SW running on single core). We donot know 
yet the influence of more than two time critical threads-maybe this will 
reduce the performance.
* In general, the first message has a large latency time for both inproc 
PUB/SUB and ROUTER/DEALER. For PUB/SUB the first message requires ca. 
100us and the remaing messages 0.3-4.0us.


Hopefully there is somebody out there who could give us some hints or 
advice on how to proceed.


Please find attached the actual source code with defines for testing. It 
will generate a file to C:\ with the timings. It has also a define for 
SPINNING, which is used to turn spinning in the receiver thread on/off, 
instead of polling.

// ZMQ_Warmup.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include "zmq.h"
#include <process.h>

// For VS Performance analysis
// #include "VSPerf.h"

#define ZMQ_RAMP_PROBLEM
#define SPINNING_RECEIVE_
#define THREAD_PRIO_CRITICAL
#define SINGLE_CORE

void StartSubscriber(void* subscriberargs);
void StartReplier(void* context);
void* ZmqReceiveData (void *socket);
char* ZmqReceiveString (void *socket);
void* ZmqSpinningReceiveData (void *socket);
char* ZmqSpinningReceiveString (void *socket);
bool ZmqHasMore (void *socket);

typedef struct
{
        void* context;
        const char* subscription;
}subscriberargs;

typedef struct
{
        const char* longstring;
        int length;
}heavystruct;

HANDLE  subscriberdone = CreateEvent( NULL, FALSE, FALSE, NULL );
HANDLE  replierdone = CreateEvent( NULL, FALSE, FALSE, NULL );
HANDLE  messagesreceived = CreateEvent( NULL, FALSE, FALSE, NULL );

FILE        *fid = NULL;
const int       numresults = 3000;
double          timing[numresults];

LARGE_INTEGER frequency;
LARGE_INTEGER start;
LARGE_INTEGER stop;

void delayMicroSeconds( float microseconds )
{
    __int64 timeEllapsed;
    __int64 timeStart;
    __int64 timeDelta;

    QueryPerformanceFrequency( (LARGE_INTEGER*)(&timeDelta ) );

    double timeToWait = (double)timeDelta * (double)microseconds / 
1000000.0f;

    QueryPerformanceCounter ( (LARGE_INTEGER*)(&timeStart ) );

    timeEllapsed = timeStart;

    while( ( timeEllapsed - timeStart ) < (_int64)timeToWait )
    {
        QueryPerformanceCounter( (LARGE_INTEGER*)(&timeEllapsed ) );

    };
}

short OpenLogFile( void)
{
        errno_t e = fopen_s( &fid, "c:\\resultlog.txt", "w");
        return 0;
}

short CloseLogFile( void)
{
        fclose(fid);
        return 0;
}

short LogMeasTiming( int indicator, double time)
{

        timing[indicator] = time;

        return 0;
}

short FlushLogFile()
{
        for (int i = 0; i < numresults/3; i++)
        {
                fprintf(fid, "Loop %5d, %6.4f [us]\n", i, timing[i]);
        }

    fprintf(fid, "\n");
    fprintf(fid, "********************* END OF SEQ 1 
*************************\n");
    fprintf(fid, "\n");

    for (int i = numresults/3; i < 2 * numresults/3; i++)
        {
                fprintf(fid, "Loop %5d, %6.4f [us]\n", i, timing[i]);
        }

    fprintf(fid, "\n");
    fprintf(fid, "********************* END OF SEQ 2 
*************************\n");
    fprintf(fid, "\n");

    for (int i = 2*numresults/3; i < 3 * numresults/3; i++)
        {
                fprintf(fid, "Loop %5d, %6.4f [us]\n", i, timing[i]);
        }

    fprintf(fid, "\n");
    fprintf(fid, "********************* END OF SEQ 3 
*************************\n");
    fprintf(fid, "\n");

        return 0;
}

double LIToSecs( LARGE_INTEGER & L)
{
        return ((double)L.QuadPart /(double)frequency.QuadPart) ;
}

double getElapsedTimeUs()
{
        LARGE_INTEGER time;
        time.QuadPart = stop.QuadPart - start.QuadPart;
        return (LIToSecs( time) * 1000000) ;
}

int main(int argc, char* argv[])
{
    Sleep(1000);

 

    //OpenLogFile for the timing data
        OpenLogFile();

    #ifdef SINGLE_CORE 
            SetProcessAffinityMask(GetCurrentProcess(), 0x80);
        SetThreadAffinityMask(GetCurrentThread(), 0x50);
    #endif

    #ifdef THREAD_PRIO_CRITICAL
        SetThreadPriority(GetCurrentThread(), 
THREAD_PRIORITY_TIME_CRITICAL);
    #endif

        //SetThreadPriorityBoost(GetCurrentThread(), false);

        //create context
        void* context = zmq_ctx_new();

 

        //initialize stopwatch
        QueryPerformanceFrequency( &frequency ) ;
        start.QuadPart = 0;
        stop.QuadPart = 0;

        //initaliase local variables
        int send_suc = 0;
    int retries = 0;
    int sendid_suc = 0; 
    int sendaddress_suc = 0;
    int send_data = 0;
        int status = 0;

        //data to send
        const char* subscription = "TestSubscription";
        const char* command = 
"TestCommandWhichIsExtremLongAndDefinitveMoreThanskajflskhdafashkdgfksdjfkjsadfaksjhdflkjashgdlfjhsdöjakjhföasjkhföasuhgöiuwaefnbobfs29Characters"
;
 
        subscriberargs subscriptiondata;
        subscriptiondata.context = context;
        subscriptiondata.subscription = subscription;

        heavystruct data;
        data.longstring = "SuperMegaLongStringThatIsReallyLong";
        data.length = strlen(data.longstring);

        //start subscriber thread
        _beginthread( StartSubscriber, 0, (void*) &subscriptiondata);

        //start replier thread
        _beginthread( StartReplier, 0, context);

        //create publisher
        void* publisher = zmq_socket (context, ZMQ_PUB); //ZMQ_PUB

        int linger = 0, buffsize = 20000;
 
        //set the options for the PUB socket
        int PubState = zmq_setsockopt ( publisher, ZMQ_LINGER, &linger, 
sizeof (linger));
    PubState |= zmq_setsockopt ( publisher, ZMQ_SNDBUF, &buffsize, sizeof 
(buffsize));
    PubState |= zmq_setsockopt ( publisher, ZMQ_RCVBUF, &buffsize, sizeof 
(buffsize));
 

        //wait till replier ready
        WaitForSingleObject(replierdone, INFINITE);

        //create requester
        void* requester = zmq_socket (context, ZMQ_DEALER); //ZMQ_REQ

    //Set options for the requester socket
    int s = zmq_setsockopt ( requester, ZMQ_LINGER, &linger, sizeof 
(linger));

    char identity [255] = "Identity";
    status = zmq_setsockopt ( requester, ZMQ_IDENTITY, identity, strlen( 
identity ) );

    zmq_setsockopt(requester, ZMQ_RCVBUF, &buffsize, sizeof (buffsize));
    zmq_setsockopt(requester, ZMQ_SNDBUF, &buffsize, sizeof (buffsize));

        //wait till subscriber did the connect
        WaitForSingleObject(subscriberdone, INFINITE);
        status |= zmq_bind (publisher,  "inproc://ZMQ_TEST_PUB_SUB" ); 
 
    status |= zmq_connect(requester, "inproc://ZMQ_TEST_ROUTER_DEALER");

        //If error occured exit program
        if(PubState)
                exit(-1);

    // ******************************* SEQ 1 (ASYNC) 
*******************************
        //send 500 multi-part messages to the subscriber and log timing 
data

 


        for (int i = 0; i < numresults/3; i++)
        {
 
                //start the timer 
                QueryPerformanceCounter(&start);

                //send multipart message
                sendaddress_suc = zmq_send (publisher, subscription, 
strlen (subscription), ZMQ_SNDMORE);
                send_suc = zmq_send (publisher, command, strlen (command), 
ZMQ_SNDMORE);
                send_data = zmq_send (publisher, (void*) &data, sizeof 
(data), 0);

 
                //stop the timer
                QueryPerformanceCounter(&stop);

 

        #ifdef ZMQ_RAMP_PROBLEM
                    //equals postsending other operations in the programm
                    delayMicroSeconds(100);
        #endif

                //Log timing
                LogMeasTiming(i, getElapsedTimeUs());

 
        }

 
    Sleep(1000);
 
    // ******************************* SEQ 2 (SYNC) 
*******************************
        //send 500 multi-part messages to the replier/router
        for (int i = numresults/3; i < 2*numresults/3; i++)
        {
        // For VS Performance analysis
        //MarkProfile( i );

        //start the timer 
        QueryPerformanceCounter(&start);

        //send multipart message
                sendaddress_suc = zmq_send (requester, "", 0, ZMQ_SNDMORE
);
                send_suc = zmq_send (requester, command, strlen (command), 
ZMQ_SNDMORE);
                send_data = zmq_send (requester, (void*) &data, sizeof 
(data), 0);
 
        //stop the timer
                QueryPerformanceCounter(&stop);

        // For VS Performance analysis
        //MarkProfile( i );

        #ifdef ZMQ_RAMP_PROBLEM
                    //equals postsending other operations in the programm
                    //delayMicroSeconds(100);  //-->The delay of "free" is 
enough.
 

            // For VS Performance analysis
            //if(getElapsedTimeUs() > 700)
            //    CommentMarkProfile( i, L"HERE" );

                    //free the envelope first
                    free(ZmqReceiveData( requester )); //Dealer/REP 
combination envelope
 
                    // get the rest of the message and free memory
            free(ZmqReceiveData( requester)); 

        #endif


        if(i==1530)
            Sleep(1);

        //Log timing
                LogMeasTiming(i, getElapsedTimeUs());

 

 
        }
 
    //Sleep(1000);

    // ******************************* SEQ 3 (ASYNC) 
*******************************
        //send 500 multi-part messages to the subscriber and log timing 
data
        for (int i = 2* numresults/3; i < 3* numresults/3; i++)
        {
                //start the timer 
                QueryPerformanceCounter(&start);

                //send multipart message
                sendaddress_suc = zmq_send (publisher, subscription, 
strlen (subscription), ZMQ_SNDMORE);
                send_suc = zmq_send (publisher, command, strlen (command), 
ZMQ_SNDMORE);
                send_data = zmq_send (publisher, (void*) &data, sizeof 
(data), 0);

                //stop the timer
                QueryPerformanceCounter(&stop);

#ifdef ZMQ_RAMP_PROBLEM
                //equals postsending other operations in the programm
                delayMicroSeconds(100);
#endif

                //Log timing
                LogMeasTiming(i, getElapsedTimeUs());
        }

        //send final multipart message to subscriber
        sendaddress_suc = zmq_send (publisher, subscription, strlen 
(subscription), ZMQ_SNDMORE);
        send_suc = zmq_send (publisher, "Ende", 4, ZMQ_SNDMORE);
        send_data = zmq_send (publisher, (void*) &data, sizeof (data), 0);

        //wait till all data has been received by subscriber
        WaitForSingleObject(messagesreceived, INFINITE);

        //close pub and requester/dealer
        status |= zmq_close (publisher); 
        publisher = NULL;
        status |= zmq_close (requester);
        requester = NULL;

        //destroy context
        zmq_ctx_term(context);

        //flush log data and close file
        FlushLogFile();
        CloseLogFile();

        return 0;
}

void StartSubscriber(void* subscriberarguments)
{
    #ifdef SINGLE_CORE 
        SetThreadAffinityMask(GetCurrentThread(), 0x70);
    #endif
 
    #ifdef THREAD_PRIO_CRITICAL
        SetThreadPriority(GetCurrentThread(), 
THREAD_PRIORITY_TIME_CRITICAL);
    #endif
 
        //SetThreadPriorityBoost(GetCurrentThread(), false);

        subscriberargs *subscriptiondata = (subscriberargs *)
subscriberarguments;

        void* subscriber = zmq_socket (subscriptiondata->context, ZMQ_SUB
);

        int linger = 0, buffsize = 20000;
        int HighWaterMark = 1;               //Set high water mark to 0 
(unlimited) for inbound messages (default 1000)

        //Set Linger
        int zmqstatus = zmq_setsockopt ( subscriber, ZMQ_LINGER, &linger, 
sizeof (linger));
 


        //set the subscription 
        zmqstatus |= zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, 
subscriptiondata->subscription, strlen(subscriptiondata->subscription));

        zmqstatus |= zmq_connect (subscriber, "inproc://ZMQ_TEST_PUB_SUB"
);
 
        SetEvent(subscriberdone);

        bool endsubsciber = false;

        while (!endsubsciber)
        {

                void* data = 0;
                char *receivedsubscription = ZmqReceiveString(subscriber);

                //if nothing received or false received
                if(receivedsubscription == NULL || 
strcmp(receivedsubscription, subscriptiondata->subscription) != 0)
                {
                        //free Ad
                        free(receivedsubscription);
                        break;
                }

                //then the command
                char *command = ZmqReceiveString(subscriber);

                //check if it is the last message
                if(strcmp(command, "Ende") == 0)
                        endsubsciber = true;

                //Check if one more message is coming
                if(ZmqHasMore(subscriber)) 
                {
                        //receive the last frame
                        data = ZmqReceiveData(subscriber);
                }

                //free memory
                free(receivedsubscription);
                free(command);
                free(data);
        }

        //close the sub socket
        zmqstatus |= zmq_close (subscriber);
        subscriber = NULL;

        //tell main thread that subscriber is closed
        SetEvent(messagesreceived);
 
        return;
}

void StartReplier(void* replierarguments)
{
    #ifdef SINGLE_CORE 
        SetThreadAffinityMask(GetCurrentThread(), 0x60);
    #endif

    #ifdef THREAD_PRIO_CRITICAL
        SetThreadPriority(GetCurrentThread(), 
THREAD_PRIORITY_TIME_CRITICAL);
    #endif
 
        //SetThreadPriorityBoost(GetCurrentThread(), false);

        //create replier/router socket
        void* replier = zmq_socket (replierarguments, ZMQ_ROUTER);

        int linger = 0, buffsize = 20000;

        //set socket options
        int zmqstatus = zmq_setsockopt ( replier, ZMQ_LINGER, &linger, 
sizeof (linger));
        zmqstatus |= zmq_setsockopt ( replier, ZMQ_RCVBUF, &buffsize, 
sizeof (buffsize));
        zmqstatus |= zmq_setsockopt ( replier, ZMQ_SNDBUF, &buffsize, 
sizeof (buffsize));


        //bind replier socket
        zmqstatus |= zmq_bind (replier, "inproc://ZMQ_TEST_ROUTER_DEALER"
);
 
        //tell main thread replier is set
        SetEvent(replierdone);

        int counter = 0;

        //run for 500 messages
        while (counter < numresults/3)
        {
                void* Data = 0;

                //first get identity
        #ifdef SPINNING_RECEIVE
                    char *identity = ZmqSpinningReceiveString(replier);
        #else
            char *identity = ZmqReceiveString(replier);
        #endif

                //if nothing received or false received
                if(identity == NULL)
                {
                        free(identity);
                        break;
                }

                //free the envelope
        #ifdef SPINNING_RECEIVE
                    free(ZmqSpinningReceiveString(replier));
        #else
            free(ZmqReceiveString(replier));
        #endif
 

                //then get the command
        #ifdef SPINNING_RECEIVE
                    char *command = ZmqSpinningReceiveString(replier);
        #else
            char *command = ZmqReceiveString(replier);
        #endif
 

                //check is there is more
                if(ZmqHasMore(replier)) 
                {
                        //receive final message
            #ifdef SPINNING_RECEIVE
                        Data = ZmqSpinningReceiveData(replier);
            #else
                Data = ZmqReceiveData(replier);
            #endif
 
                }

        //Sleep(1);

                //send the reply
                zmq_send(replier, "Identity", strlen("Identity"), 
ZMQ_SNDMORE);
                zmq_send(replier, "", 0, ZMQ_SNDMORE);
                zmq_send(replier, "ReceiveComplete", 15, 0);

                //Free memory
                free(identity);
                free(command);
                free(Data); 

                //counts the messages received
                counter++;

        }

        //close replier
        zmqstatus |= zmq_close (replier);
        replier = NULL;
 
        return;
}


void* ZmqSpinningReceiveData (void *socket) {
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);
        //assert (rc == 0);
 
    rc = -1;
    while(  rc == -1 )
    {
        rc = zmq_msg_recv (&msg, socket, ZMQ_DONTWAIT);
        //Sleep(0);
    }

        if(rc==-1)
        {
                zmq_msg_close (&msg);
                return NULL;
        }

        void *rcvdata = zmq_msg_data(&msg);

        void* buffer = malloc( rc );
        memcpy_s( buffer, rc, rcvdata, rc);

        zmq_msg_close (&msg);

        return buffer;
}


void* ZmqReceiveData (void *socket) {
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);
        //assert (rc == 0);
        /* Block until a message is available to be received from socket 
*/
        rc = zmq_msg_recv (&msg, socket, 0);
        if(rc==-1)
        {
                zmq_msg_close (&msg);
                return NULL;
        }

        void *rcvdata = zmq_msg_data(&msg);

        void* buffer = malloc( rc );
        memcpy_s( buffer, rc, rcvdata, rc);

        zmq_msg_close (&msg);

        return buffer;
}


char* ZmqSpinningReceiveString (void *socket) {
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);

        /* Block until a message is available to be received from socket 
*/
    rc = -1;
    while(  rc == -1 )
    {
        rc = zmq_msg_recv (&msg, socket, ZMQ_DONTWAIT);
        //Sleep(0);
    }


        //assert (rc != -1);

        if(rc==-1)
        {
                zmq_msg_close (&msg); 
                //Wait some time to have a look at the message before .exe 
gets closed
                return NULL;
        }

        void* rcvdata = zmq_msg_data(&msg);

 
        char* buffer = (char*) malloc( rc + 1 );
        memcpy_s(buffer, rc, rcvdata, rc);
        buffer[rc] = '\0';
 
        zmq_msg_close (&msg);

 
        return buffer;
}


char* ZmqReceiveString (void *socket) {
        zmq_msg_t msg;
        int rc = zmq_msg_init (&msg);

        /* Block until a message is available to be received from socket 
*/
        rc = zmq_msg_recv (&msg, socket, 0);
        //assert (rc != -1);

        if(rc==-1)
        {
                zmq_msg_close (&msg); 
                //Wait some time to have a look at the message before .exe 
gets closed
                return NULL;
        }

        void* rcvdata = zmq_msg_data(&msg);

 
        char* buffer = (char*) malloc( rc + 1 );
        memcpy_s(buffer, rc, rcvdata, rc);
        buffer[rc] = '\0';
 
        zmq_msg_close (&msg);

 
        return buffer;
}

bool ZmqHasMore (void *socket) 
{
        int64_t more;           //  Multipart detection
    more = 0;
    size_t more_size = sizeof (more);
    zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);

        if (!more)
                return false;
        else
                return true;
}




Von:    Pieter Hintjens <[email protected]>
An:     ZeroMQ development list <[email protected]>, 
Datum:  10.10.2015 01:45
Betreff:        Re: [zeromq-dev] Bad ZMQ latency on first 500 messages per 
connection
Gesendet von:   [email protected]



On Fri, Oct 9, 2015 at 11:02 AM,  <[email protected]> 
wrote:

> Our findings so far:

It would be really interesting and useful to other people if you could
write your experiences up in a blog or article somewhere...

> 2. There are still spikes every about 100 ms, which is annoying. Has
> somebody an idea where this comes from?

It could be Linux kernel behavior. You can try with a real time kernel.

-Pieter
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to