In the given testproject we have a pub/sub pattern and a router/dealer
pattern. In this test we send in "SEQ 1" 500 multipart messages over the
pub/sub, then in "SEQ 2" 500 multipart messages over the router/dealer
(to do something else, but this is not required) and then in "SEQ 3" 500
multipart messages again over the pub/sub again.
There is also a flag "ZMQ_RAMP_PROBLEM" defined that is needed to simulate
our problem that there is a ramp at the beginning in "SEQ 1" till message
ca. 500 where it breaks down, which can be seen from message ca. 1000 on
in SEQ 3 of the result log.. If the #define is deactivated the messages
will be sent continuously and the messages will be sent very fast while
the activation of this #define includes a sleep after every multipart
message and brings up the ramp till message 500. For better understanding
there is also a resultfile.log file given where all the timings of the
pub/sub connection are logged.
There is also a flag "SPINNING_RECEIVE" that turns spinning receive
(NOWAIT option) on.
Our findings so far:
1. In C++ there is the possibility to restrict the process and threads
running on a single core. We guess, that this removes the
synchronization/handling time between multiple cores. As a result there is
no latency ramp (as explained before) - at least for pub/sub-, just the
first few messages have still a latency but not the first 500 of every
connection. We still investigate this, but might be the solution.
2. There are still spikes every about 100 ms, which is annoying. Has
somebody an idea where this comes from?
3. The CPU Frequency if set to 100% makes everything faster but is not the
reason for the ramp.
4. set "THREAD_PRIORITY_TIME_CRITICAL" also gave us more performance and
for pub/sub the ramp disappeared. But it is still there for DEALER/ROUTER.
Question: Where does the ramp come from?
Code of "ZMQ_Warmup.cpp". It can be compiled to a console application. It
just needs zmq.
// ZMQ_Warmup.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include "zmq.h"
#include <process.h>
// For VS Performance analysis
// #include "VSPerf.h"
#define ZMQ_RAMP_PROBLEM
#define SPINNING_RECEIVE
void StartSubscriber(void* subscriberargs);
void StartReplier(void* context);
void* ZmqReceiveData (void *socket);
char* ZmqReceiveString (void *socket);
void* ZmqSpinningReceiveData (void *socket);
char* ZmqSpinningReceiveString (void *socket);
bool ZmqHasMore (void *socket);
typedef struct
{
void* context;
const char* subscription;
}subscriberargs;
typedef struct
{
const char* longstring;
int length;
}heavystruct;
HANDLE subscriberdone = CreateEvent( NULL, FALSE, FALSE, NULL );
HANDLE replierdone = CreateEvent( NULL, FALSE, FALSE, NULL );
HANDLE messagesreceived = CreateEvent( NULL, FALSE, FALSE, NULL );
FILE *fid = NULL;
const int numresults = 1500;
double timing[numresults];
int outbufsize[numresults];
LARGE_INTEGER frequency;
LARGE_INTEGER start;
LARGE_INTEGER stop;
void delayMicroSeconds( float microseconds )
{
__int64 timeEllapsed;
__int64 timeStart;
__int64 timeDelta;
QueryPerformanceFrequency( (LARGE_INTEGER*)(&timeDelta ) );
double timeToWait = (double)timeDelta * (double)microseconds /
1000000.0f;
QueryPerformanceCounter ( (LARGE_INTEGER*)(&timeStart ) );
timeEllapsed = timeStart;
while( ( timeEllapsed - timeStart ) < (_int64)timeToWait )
{
QueryPerformanceCounter( (LARGE_INTEGER*)(&timeEllapsed ) );
};
}
short OpenLogFile( void)
{
errno_t e = fopen_s( &fid, "c:\\resultlog.txt", "w");
return 0;
}
short CloseLogFile( void)
{
fclose(fid);
return 0;
}
short LogMeasTiming( int indicator, double time)
{
timing[indicator] = time;
return 0;
}
short LogMeasOutBufSize( int indicator, int BufSize)
{
outbufsize[indicator] = BufSize;
return 0;
}
short FlushLogFile()
{
for (int i = 0; i < numresults/3; i++)
{
fprintf(fid, "Loop %5d, %6.2f [us], %5d\n", i, timing[i],
outbufsize[i]);
}
fprintf(fid, "\n");
fprintf(fid, "********************* END OF SEQ 1
*************************\n");
fprintf(fid, "\n");
for (int i = numresults/3; i < 2 * numresults/3; i++)
{
fprintf(fid, "Loop %5d, %6.2f [us], %5d\n", i, timing[i],
outbufsize[i]);
}
fprintf(fid, "\n");
fprintf(fid, "********************* END OF SEQ 2
*************************\n");
fprintf(fid, "\n");
for (int i = 2*numresults/3; i < 3 * numresults/3; i++)
{
fprintf(fid, "Loop %5d, %6.2f [us], %5d\n", i, timing[i],
outbufsize[i]);
}
fprintf(fid, "\n");
fprintf(fid, "********************* END OF SEQ 3
*************************\n");
fprintf(fid, "\n");
return 0;
}
double LIToSecs( LARGE_INTEGER & L)
{
return ((double)L.QuadPart /(double)frequency.QuadPart) ;
}
double getElapsedTimeUs()
{
LARGE_INTEGER time;
time.QuadPart = stop.QuadPart - start.QuadPart;
return (LIToSecs( time) * 1000000) ;
}
int main(int argc, char* argv[])
{
Sleep(1000);
//OpenLogFile for the timing data
OpenLogFile();
SetProcessAffinityMask(GetCurrentProcess(), 0x80);
SetThreadAffinityMask(GetCurrentThread(), 0x80);
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL);
//SetThreadPriorityBoost(GetCurrentThread(), false);
//create context
void* context = zmq_ctx_new();
//initialize stopwatch
QueryPerformanceFrequency( &frequency ) ;
start.QuadPart = 0;
stop.QuadPart = 0;
//initaliase local variables
int send_suc = 0;
int retries = 0;
int sendid_suc = 0;
int sendaddress_suc = 0;
int send_data = 0;
int status = 0;
//data to send
const char* subscription = "TestSubscription";
const char* command =
"TestCommandWhichIsExtremLongAndDefinitveMoreThanskajflskhdafashkdgfksdjfkjsadfaksjhdflkjashgdlfjhsdöjakjhföasjkhföasuhgöiuwaefnbobfs29Characters"
;
subscriberargs subscriptiondata;
subscriptiondata.context = context;
subscriptiondata.subscription = subscription;
heavystruct data;
data.longstring = "SuperMegaLongStringThatIsReallyLong";
data.length = strlen(data.longstring);
//start subscriber thread
_beginthread( StartSubscriber, 0, (void*) &subscriptiondata);
//start replier thread
_beginthread( StartReplier, 0, context);
//create publisher
void* publisher = zmq_socket (context, ZMQ_PUB); //ZMQ_PUB
int linger = 0, buffsize = 20000;
//set the options for the PUB socket
int PubState = zmq_setsockopt ( publisher, ZMQ_LINGER, &linger,
sizeof (linger));
PubState |= zmq_setsockopt ( publisher, ZMQ_SNDBUF, &buffsize, sizeof
(buffsize));
PubState |= zmq_setsockopt ( publisher, ZMQ_RCVBUF, &buffsize, sizeof
(buffsize));
//wait till replier ready
WaitForSingleObject(replierdone, INFINITE);
//create requester
void* requester = zmq_socket (context, ZMQ_DEALER); //ZMQ_REQ
//Set options for the requester socket
int s = zmq_setsockopt ( requester, ZMQ_LINGER, &linger, sizeof
(linger));
char identity [255] = "Identity";
status = zmq_setsockopt ( requester, ZMQ_IDENTITY, identity, strlen(
identity ) );
zmq_setsockopt(requester, ZMQ_RCVBUF, &buffsize, sizeof (buffsize));
zmq_setsockopt(requester, ZMQ_SNDBUF, &buffsize, sizeof (buffsize));
//wait till subscriber did the connect
WaitForSingleObject(subscriberdone, INFINITE);
status |= zmq_bind (publisher, "inproc://ZMQ_TEST_PUB_SUB" );
status |= zmq_connect(requester, "inproc://ZMQ_TEST_ROUTER_DEALER");
//If error occured exit program
if(PubState)
exit(-1);
// ******************************* SEQ 1 (ASYNC)
*******************************
//send 500 multi-part messages to the subscriber and log timing
data
for (int i = 0; i < numresults/3; i++)
{
//start the timer
QueryPerformanceCounter(&start);
//send multipart message
sendaddress_suc = zmq_send (publisher, subscription,
strlen (subscription), ZMQ_SNDMORE);
send_suc = zmq_send (publisher, command, strlen (command),
ZMQ_SNDMORE);
send_data = zmq_send (publisher, (void*) &data, sizeof
(data), 0);
//stop the timer
QueryPerformanceCounter(&stop);
#ifdef ZMQ_RAMP_PROBLEM
//equals postsending other operations in the programm
delayMicroSeconds(100);
#endif
//Log timing
LogMeasTiming(i, getElapsedTimeUs());
}
Sleep(1000);
// ******************************* SEQ 2 (SYNC)
*******************************
//send 500 multi-part messages to the replier/router
for (int i = numresults/3; i < 2*numresults/3; i++)
{
// For VS Performance analysis
//MarkProfile( i );
//send multipart message
sendaddress_suc = zmq_send (requester, "", 0, ZMQ_SNDMORE
);
send_suc = zmq_send (requester, command, strlen (command),
ZMQ_SNDMORE);
if(send_suc==-1)
{
int isdfg=0;
}
//start the timer
QueryPerformanceCounter(&start);
send_data = zmq_send (requester, (void*) &data, sizeof
(data), 0);
//stop the timer
QueryPerformanceCounter(&stop);
// For VS Performance analysis
//MarkProfile( i );
#ifdef ZMQ_RAMP_PROBLEM
//equals postsending other operations in the programm
delayMicroSeconds(100);
#endif
//Log timing
LogMeasTiming(i, getElapsedTimeUs());
// For VS Performance analysis
//if(getElapsedTimeUs() > 700)
// CommentMarkProfile( i, L"HERE" );
//free the envelope first
free(ZmqReceiveData( requester )); //Dealer/REP
combination envelope
//ZmqReceiveData( requester );
// get the rest of the message and free memory
free(ZmqReceiveData( requester));
//ZmqReceiveData( requester);
}
//Sleep(1000);
// ******************************* SEQ 3 (ASYNC)
*******************************
//send 500 multi-part messages to the subscriber and log timing
data
for (int i = 2* numresults/3; i < 3* numresults/3; i++)
{
//start the timer
QueryPerformanceCounter(&start);
//send multipart message
sendaddress_suc = zmq_send (publisher, subscription,
strlen (subscription), ZMQ_SNDMORE);
send_suc = zmq_send (publisher, command, strlen (command),
ZMQ_SNDMORE);
send_data = zmq_send (publisher, (void*) &data, sizeof
(data), 0);
//stop the timer
QueryPerformanceCounter(&stop);
#ifdef ZMQ_RAMP_PROBLEM
//equals postsending other operations in the programm
delayMicroSeconds(100);
#endif
//Log timing
LogMeasTiming(i, getElapsedTimeUs());
}
//send final multipart message to subscriber
sendaddress_suc = zmq_send (publisher, subscription, strlen
(subscription), ZMQ_SNDMORE);
send_suc = zmq_send (publisher, "Ende", 4, ZMQ_SNDMORE);
send_data = zmq_send (publisher, (void*) &data, sizeof (data), 0);
//wait till all data has been received by subscriber
WaitForSingleObject(messagesreceived, INFINITE);
//close pub and requester/dealer
status |= zmq_close (publisher);
publisher = NULL;
status |= zmq_close (requester);
requester = NULL;
//destroy context
zmq_ctx_term(context);
//flush log data and close file
FlushLogFile();
CloseLogFile();
return 0;
}
void StartSubscriber(void* subscriberarguments)
{
SetThreadAffinityMask(GetCurrentThread(), 0x80);
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL);
//SetThreadPriorityBoost(GetCurrentThread(), false);
subscriberargs *subscriptiondata = (subscriberargs *)
subscriberarguments;
void* subscriber = zmq_socket (subscriptiondata->context, ZMQ_SUB
);
int linger = 0, buffsize = 20000;
int HighWaterMark = 1; //Set high water mark to 0
(unlimited) for inbound messages (default 1000)
//Set Linger
int zmqstatus = zmq_setsockopt ( subscriber, ZMQ_LINGER, &linger,
sizeof (linger));
//set the subscription
zmqstatus |= zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,
subscriptiondata->subscription, strlen(subscriptiondata->subscription));
zmqstatus |= zmq_connect (subscriber, "inproc://ZMQ_TEST_PUB_SUB"
);
SetEvent(subscriberdone);
bool endsubsciber = false;
while (!endsubsciber)
{
void* data = 0;
char *receivedsubscription = ZmqReceiveString(subscriber);
//if nothing received or false received
if(receivedsubscription == NULL ||
strcmp(receivedsubscription, subscriptiondata->subscription) != 0)
{
//free Ad
free(receivedsubscription);
break;
}
//then the command
char *command = ZmqReceiveString(subscriber);
//check if it is the last message
if(strcmp(command, "Ende") == 0)
endsubsciber = true;
//Check if one more message is coming
if(ZmqHasMore(subscriber))
{
//receive the last frame
data = ZmqReceiveData(subscriber);
}
//free memory
free(receivedsubscription);
free(command);
free(data);
}
//close the sub socket
zmqstatus |= zmq_close (subscriber);
subscriber = NULL;
//tell main thread that subscriber is closed
SetEvent(messagesreceived);
return;
}
void StartReplier(void* replierarguments)
{
SetThreadAffinityMask(GetCurrentThread(), 0x80);
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL);
//SetThreadPriorityBoost(GetCurrentThread(), false);
//create replier/router socket
void* replier = zmq_socket (replierarguments, ZMQ_ROUTER);
int linger = 0, buffsize = 20000;
//set socket options
int zmqstatus = zmq_setsockopt ( replier, ZMQ_LINGER, &linger,
sizeof (linger));
zmqstatus |= zmq_setsockopt ( replier, ZMQ_RCVBUF, &buffsize,
sizeof (buffsize));
zmqstatus |= zmq_setsockopt ( replier, ZMQ_SNDBUF, &buffsize,
sizeof (buffsize));
//bind replier socket
zmqstatus |= zmq_bind (replier, "inproc://ZMQ_TEST_ROUTER_DEALER"
);
//tell main thread replier is set
SetEvent(replierdone);
int counter = 0;
//run for 500 messages
while (counter < numresults/3)
{
void* Data = 0;
//first get identity
#ifdef SPINNING_RECEIVE
char *identity = ZmqSpinningReceiveString(replier);
#else
char *identity = ZmqReceiveString(replier);
#endif
//if nothing received or false received
if(identity == NULL)
{
free(identity);
break;
}
//free the envelope
#ifdef SPINNING_RECEIVE
free(ZmqSpinningReceiveString(replier));
#else
free(ZmqReceiveString(replier));
#endif
//then get the command
#ifdef SPINNING_RECEIVE
char *command = ZmqSpinningReceiveString(replier);
#else
char *command = ZmqReceiveString(replier);
#endif
//check is there is more
if(ZmqHasMore(replier))
{
//receive final message
#ifdef SPINNING_RECEIVE
Data = ZmqSpinningReceiveData(replier);
#else
Data = ZmqReceiveData(replier);
#endif
}
//Sleep(1);
//send the reply
zmq_send(replier, "Identity", strlen("Identity"),
ZMQ_SNDMORE);
zmq_send(replier, "", 0, ZMQ_SNDMORE);
zmq_send(replier, "ReceiveComplete", 15, 0);
//Free memory
free(identity);
free(command);
free(Data);
//counts the messages received
counter++;
}
//close replier
zmqstatus |= zmq_close (replier);
replier = NULL;
return;
}
void* ZmqSpinningReceiveData (void *socket) {
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
//assert (rc == 0);
rc = -1;
while( rc == -1 )
{
rc = zmq_msg_recv (&msg, socket, ZMQ_DONTWAIT);
//Sleep(0);
}
if(rc==-1)
{
zmq_msg_close (&msg);
return NULL;
}
void *rcvdata = zmq_msg_data(&msg);
void* buffer = malloc( rc );
memcpy_s( buffer, rc, rcvdata, rc);
zmq_msg_close (&msg);
return buffer;
}
void* ZmqReceiveData (void *socket) {
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
//assert (rc == 0);
/* Block until a message is available to be received from socket
*/
rc = zmq_msg_recv (&msg, socket, 0);
if(rc==-1)
{
zmq_msg_close (&msg);
return NULL;
}
void *rcvdata = zmq_msg_data(&msg);
void* buffer = malloc( rc );
memcpy_s( buffer, rc, rcvdata, rc);
zmq_msg_close (&msg);
return buffer;
}
char* ZmqSpinningReceiveString (void *socket) {
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
/* Block until a message is available to be received from socket
*/
rc = -1;
while( rc == -1 )
{
rc = zmq_msg_recv (&msg, socket, ZMQ_DONTWAIT);
//Sleep(0);
}
//assert (rc != -1);
if(rc==-1)
{
zmq_msg_close (&msg);
//Wait some time to have a look at the message before .exe
gets closed
return NULL;
}
void* rcvdata = zmq_msg_data(&msg);
char* buffer = (char*) malloc( rc + 1 );
memcpy_s(buffer, rc, rcvdata, rc);
buffer[rc] = '\0';
zmq_msg_close (&msg);
return buffer;
}
char* ZmqReceiveString (void *socket) {
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
/* Block until a message is available to be received from socket
*/
rc = zmq_msg_recv (&msg, socket, 0);
//assert (rc != -1);
if(rc==-1)
{
zmq_msg_close (&msg);
//Wait some time to have a look at the message before .exe
gets closed
return NULL;
}
void* rcvdata = zmq_msg_data(&msg);
char* buffer = (char*) malloc( rc + 1 );
memcpy_s(buffer, rc, rcvdata, rc);
buffer[rc] = '\0';
zmq_msg_close (&msg);
return buffer;
}
bool ZmqHasMore (void *socket)
{
int64_t more; // Multipart detection
more = 0;
size_t more_size = sizeof (more);
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
if (!more)
return false;
else
return true;
}
Von: Pieter Hintjens <[email protected]>
An: ZeroMQ development list <[email protected]>,
Kopie: [email protected]
Datum: 30.09.2015 14:50
Betreff: Re: [zeromq-dev] Bad ZMQ latency on first 500 messages per
connection
Gesendet von: [email protected]
Before tweaking the send calls, I'd advise making a *minimal* test
case that shows the effect, in C. We can then see exactly where the
latency spike is coming from. No point in trying to remove an effect
we don't understand (in fact, counter productive, we need to
understand the spike first.)
On Wed, Sep 30, 2015 at 8:31 AM, <[email protected]>
wrote:
> We also agree that there must be a buffer somewhere in TCP stack or
> OS/memory(page allocation)/scheduling. It becomes clear, like Ben
mentioned,
> when the sender sends a lot more messages than the receiver receives.
>
> A time delay for the TCP connection could be a reason for that, as Peter
> asked. But in our scenario we do a zmq_connect/zmq_bind at the
beginning,
> which is not part of the measured results.
> We use inproc communication with ZMQ 4.0.4 in a C++ project with
"zmq_send".
>
> All message data we have is memory aligned structures (memory blocks).
Does
> this mean we could also utilize "zmq_send_const"? We could then
eliminate at
> least one copy operation if I am right.
>
>
>
>> Date: Tue, 29 Sep 2015 09:44:19 +0200
>> From: Pieter Hintjens <[email protected]>
>> Subject: Re: [zeromq-dev] Bad ZMQ latency on first 500 messages per
>> connection
>> To: ZeroMQ development list <[email protected]>
>> Message-ID:
>>
>> <CADL5_sgpYKi=k2doeObsrAkmGbfzv6SMcfe=q2qwp0_zcoa...@mail.gmail.com>
>> Content-Type: text/plain; charset=UTF-8
>
>>
>> Have you excluded the cost of establishing the TCP connection itself
>> (something around 10 msec, depending on the setup)?
>>
>> On Tue, Sep 29, 2015 at 8:49 AM, Ben Kloosterman <[email protected]>
>> wrote:
>>> "As can be seen, sender and receiver throughput's differ at the
beginning
>>> of
>>> the test. This seems to suggest that there is some kind of buffering
>>> involved on the lower layers of the stack"
>>>
>>> Nagle is turned off but it looks like it keeps increasing some sort
of
>>> buffer in the tcp stack.. could also be OS / memory ( page allocation)
/
>>> Scheduling priority.
>>>
>>> Note in those 500 messages the sender was sending a lot more messages
,
>>> than
>>> the receiver was receiving so a buffer would be growing somewhere.
>>>
>>> Ben
>>>
>>> On Tue, Sep 29, 2015 at 4:35 PM, <[email protected]>
>>> wrote:
>>>>
>>>> Hi all,
>>>>
>>>> when sending many messages at once there is a known latency issue:
The
>>>> first 500 messages are sent with a high latency shown in the graph in
>>>> the
>>>> whitepaper
> http://zeromq.org/whitepapers:measuring-performance before it
>
>>>> gets to very high speed messaging.
>>>>
>>>> The author summarizes:
>>>> * For latency: ... the latency for first 500 messages is quite poor
(up
>>>> to
>>>> 1,500 microseconds), however, afterwards it stabilises at
approximately
>>>> 150
>>>> microseconds, with occasional peaks up to 400 microseconds.
>>>> * For throughput: This seems to suggest that there is some kind of
>>>> buffering involved on the lower layers of the stack.
>>>>
>>>> In our scenario, we use a Dealer/Router connection, which is in fact
>>>> like
>>>> REQ/REP.
>>>>
>>>> Question:
>>>> Why have the first 500 messages this latency issue? What is the
reason
>>>> for
>>>> this and how could we solve this issue, because we require a high
>>>> performance application even for the first messages.
>>>>
>>>
>>> _______________________________________________
>>> zeromq-dev mailing list
>>> [email protected]
>>>
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev