>It could be Linux kernel behavior. You can try with a real time kernel.
We are on a Windows 7 machine, core i7-3770 CPU @ 4.40GHz, 16 GB RAM, 64
Bit. So you think Windows Kernel behavior. Unfortunately I don't have a
real time kernel, but I will follow this hint.
>Before tweaking the send calls, I'd advise making a *minimal* test
>case that shows the effect, in C. We can then see exactly where the
>latency spike is coming from. No point in trying to remove an effect
>we don't understand (in fact, counter productive, we need to
>understand the spike first.)
We have sent you the C source code in the last message as asked. Could you
reproduce the issues?
>It would be really interesting and useful to other people if you could
write your experiences up in a blog or article somewhere...
I have a problem, and it should also be a problem for others if the time
of the first about 500 meassages for inproc communication is too large. I
post to the list because this is the right place for such questions and I
think if I can't find a solution or at least the reason for this behavior,
it might be a (at least) medium term property of ZMQ.
If so, (please correct me) then for me it is a point where I start
thinking about a ZMQ replacement in our software because we can't live
with it-and also others should know this behavior.
New findings:
* We have set the compiler to compile to 32 bit. A change to 64 bit gave
us more performance and the latencies are reduced which is good news. But
the main issue and behavior of spikes and the latency ramp is still there.
Summary so far:
* Set the compiler to 64 bit reduced the latency times, but the main
issues are still there: spikes and the latency ramp.
* Inproc communication for PUB/SUB and ROUTER/DEALER creates a latency
ramp for the first ca. 500 messages.
* The problem for dealer/router is that the time for sending inproc 172
bytes is up to 800us=0.8ms (!!!) at the end of the ramp and 80-130us
before/after the ramp with all optimizations done (Thread prio =Time
Critical and SW running on single core).
* The latency ramp appears if there is a delay between the send commands,
either by a delayMicroSeconds(100) (PUB/SUB) or by calls to
free(ZmqReceiveData( requester )) (ROUTER/DEALER).
* For PUB/SUB the communication is OK with all optimizations turned on
(Thread prio =Time Critical and SW running on single core). We donot know
yet the influence of more than two time critical threads-maybe this will
reduce the performance.
* In general, the first message has a large latency time for both inproc
PUB/SUB and ROUTER/DEALER. For PUB/SUB the first message requires ca.
100us and the remaing messages 0.3-4.0us.
Hopefully there is somebody out there who could give us some hints or
advice on how to proceed.
Please find attached the actual source code with defines for testing. It
will generate a file to C:\ with the timings. It has also a define for
SPINNING, which is used to turn spinning in the receiver thread on/off,
instead of polling.
// ZMQ_Warmup.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include "zmq.h"
#include <process.h>
// For VS Performance analysis
// #include "VSPerf.h"
#define ZMQ_RAMP_PROBLEM
#define SPINNING_RECEIVE_
#define THREAD_PRIO_CRITICAL
#define SINGLE_CORE
void StartSubscriber(void* subscriberargs);
void StartReplier(void* context);
void* ZmqReceiveData (void *socket);
char* ZmqReceiveString (void *socket);
void* ZmqSpinningReceiveData (void *socket);
char* ZmqSpinningReceiveString (void *socket);
bool ZmqHasMore (void *socket);
typedef struct
{
void* context;
const char* subscription;
}subscriberargs;
typedef struct
{
const char* longstring;
int length;
}heavystruct;
HANDLE subscriberdone = CreateEvent( NULL, FALSE, FALSE, NULL );
HANDLE replierdone = CreateEvent( NULL, FALSE, FALSE, NULL );
HANDLE messagesreceived = CreateEvent( NULL, FALSE, FALSE, NULL );
FILE *fid = NULL;
const int numresults = 3000;
double timing[numresults];
LARGE_INTEGER frequency;
LARGE_INTEGER start;
LARGE_INTEGER stop;
void delayMicroSeconds( float microseconds )
{
__int64 timeEllapsed;
__int64 timeStart;
__int64 timeDelta;
QueryPerformanceFrequency( (LARGE_INTEGER*)(&timeDelta ) );
double timeToWait = (double)timeDelta * (double)microseconds /
1000000.0f;
QueryPerformanceCounter ( (LARGE_INTEGER*)(&timeStart ) );
timeEllapsed = timeStart;
while( ( timeEllapsed - timeStart ) < (_int64)timeToWait )
{
QueryPerformanceCounter( (LARGE_INTEGER*)(&timeEllapsed ) );
};
}
short OpenLogFile( void)
{
errno_t e = fopen_s( &fid, "c:\\resultlog.txt", "w");
return 0;
}
short CloseLogFile( void)
{
fclose(fid);
return 0;
}
short LogMeasTiming( int indicator, double time)
{
timing[indicator] = time;
return 0;
}
short FlushLogFile()
{
for (int i = 0; i < numresults/3; i++)
{
fprintf(fid, "Loop %5d, %6.4f [us]\n", i, timing[i]);
}
fprintf(fid, "\n");
fprintf(fid, "********************* END OF SEQ 1
*************************\n");
fprintf(fid, "\n");
for (int i = numresults/3; i < 2 * numresults/3; i++)
{
fprintf(fid, "Loop %5d, %6.4f [us]\n", i, timing[i]);
}
fprintf(fid, "\n");
fprintf(fid, "********************* END OF SEQ 2
*************************\n");
fprintf(fid, "\n");
for (int i = 2*numresults/3; i < 3 * numresults/3; i++)
{
fprintf(fid, "Loop %5d, %6.4f [us]\n", i, timing[i]);
}
fprintf(fid, "\n");
fprintf(fid, "********************* END OF SEQ 3
*************************\n");
fprintf(fid, "\n");
return 0;
}
double LIToSecs( LARGE_INTEGER & L)
{
return ((double)L.QuadPart /(double)frequency.QuadPart) ;
}
double getElapsedTimeUs()
{
LARGE_INTEGER time;
time.QuadPart = stop.QuadPart - start.QuadPart;
return (LIToSecs( time) * 1000000) ;
}
int main(int argc, char* argv[])
{
Sleep(1000);
//OpenLogFile for the timing data
OpenLogFile();
#ifdef SINGLE_CORE
SetProcessAffinityMask(GetCurrentProcess(), 0x80);
SetThreadAffinityMask(GetCurrentThread(), 0x50);
#endif
#ifdef THREAD_PRIO_CRITICAL
SetThreadPriority(GetCurrentThread(),
THREAD_PRIORITY_TIME_CRITICAL);
#endif
//SetThreadPriorityBoost(GetCurrentThread(), false);
//create context
void* context = zmq_ctx_new();
//initialize stopwatch
QueryPerformanceFrequency( &frequency ) ;
start.QuadPart = 0;
stop.QuadPart = 0;
//initaliase local variables
int send_suc = 0;
int retries = 0;
int sendid_suc = 0;
int sendaddress_suc = 0;
int send_data = 0;
int status = 0;
//data to send
const char* subscription = "TestSubscription";
const char* command =
"TestCommandWhichIsExtremLongAndDefinitveMoreThanskajflskhdafashkdgfksdjfkjsadfaksjhdflkjashgdlfjhsdöjakjhföasjkhföasuhgöiuwaefnbobfs29Characters"
;
subscriberargs subscriptiondata;
subscriptiondata.context = context;
subscriptiondata.subscription = subscription;
heavystruct data;
data.longstring = "SuperMegaLongStringThatIsReallyLong";
data.length = strlen(data.longstring);
//start subscriber thread
_beginthread( StartSubscriber, 0, (void*) &subscriptiondata);
//start replier thread
_beginthread( StartReplier, 0, context);
//create publisher
void* publisher = zmq_socket (context, ZMQ_PUB); //ZMQ_PUB
int linger = 0, buffsize = 20000;
//set the options for the PUB socket
int PubState = zmq_setsockopt ( publisher, ZMQ_LINGER, &linger,
sizeof (linger));
PubState |= zmq_setsockopt ( publisher, ZMQ_SNDBUF, &buffsize, sizeof
(buffsize));
PubState |= zmq_setsockopt ( publisher, ZMQ_RCVBUF, &buffsize, sizeof
(buffsize));
//wait till replier ready
WaitForSingleObject(replierdone, INFINITE);
//create requester
void* requester = zmq_socket (context, ZMQ_DEALER); //ZMQ_REQ
//Set options for the requester socket
int s = zmq_setsockopt ( requester, ZMQ_LINGER, &linger, sizeof
(linger));
char identity [255] = "Identity";
status = zmq_setsockopt ( requester, ZMQ_IDENTITY, identity, strlen(
identity ) );
zmq_setsockopt(requester, ZMQ_RCVBUF, &buffsize, sizeof (buffsize));
zmq_setsockopt(requester, ZMQ_SNDBUF, &buffsize, sizeof (buffsize));
//wait till subscriber did the connect
WaitForSingleObject(subscriberdone, INFINITE);
status |= zmq_bind (publisher, "inproc://ZMQ_TEST_PUB_SUB" );
status |= zmq_connect(requester, "inproc://ZMQ_TEST_ROUTER_DEALER");
//If error occured exit program
if(PubState)
exit(-1);
// ******************************* SEQ 1 (ASYNC)
*******************************
//send 500 multi-part messages to the subscriber and log timing
data
for (int i = 0; i < numresults/3; i++)
{
//start the timer
QueryPerformanceCounter(&start);
//send multipart message
sendaddress_suc = zmq_send (publisher, subscription,
strlen (subscription), ZMQ_SNDMORE);
send_suc = zmq_send (publisher, command, strlen (command),
ZMQ_SNDMORE);
send_data = zmq_send (publisher, (void*) &data, sizeof
(data), 0);
//stop the timer
QueryPerformanceCounter(&stop);
#ifdef ZMQ_RAMP_PROBLEM
//equals postsending other operations in the programm
delayMicroSeconds(100);
#endif
//Log timing
LogMeasTiming(i, getElapsedTimeUs());
}
Sleep(1000);
// ******************************* SEQ 2 (SYNC)
*******************************
//send 500 multi-part messages to the replier/router
for (int i = numresults/3; i < 2*numresults/3; i++)
{
// For VS Performance analysis
//MarkProfile( i );
//start the timer
QueryPerformanceCounter(&start);
//send multipart message
sendaddress_suc = zmq_send (requester, "", 0, ZMQ_SNDMORE
);
send_suc = zmq_send (requester, command, strlen (command),
ZMQ_SNDMORE);
send_data = zmq_send (requester, (void*) &data, sizeof
(data), 0);
//stop the timer
QueryPerformanceCounter(&stop);
// For VS Performance analysis
//MarkProfile( i );
#ifdef ZMQ_RAMP_PROBLEM
//equals postsending other operations in the programm
//delayMicroSeconds(100); //-->The delay of "free" is
enough.
// For VS Performance analysis
//if(getElapsedTimeUs() > 700)
// CommentMarkProfile( i, L"HERE" );
//free the envelope first
free(ZmqReceiveData( requester )); //Dealer/REP
combination envelope
// get the rest of the message and free memory
free(ZmqReceiveData( requester));
#endif
if(i==1530)
Sleep(1);
//Log timing
LogMeasTiming(i, getElapsedTimeUs());
}
//Sleep(1000);
// ******************************* SEQ 3 (ASYNC)
*******************************
//send 500 multi-part messages to the subscriber and log timing
data
for (int i = 2* numresults/3; i < 3* numresults/3; i++)
{
//start the timer
QueryPerformanceCounter(&start);
//send multipart message
sendaddress_suc = zmq_send (publisher, subscription,
strlen (subscription), ZMQ_SNDMORE);
send_suc = zmq_send (publisher, command, strlen (command),
ZMQ_SNDMORE);
send_data = zmq_send (publisher, (void*) &data, sizeof
(data), 0);
//stop the timer
QueryPerformanceCounter(&stop);
#ifdef ZMQ_RAMP_PROBLEM
//equals postsending other operations in the programm
delayMicroSeconds(100);
#endif
//Log timing
LogMeasTiming(i, getElapsedTimeUs());
}
//send final multipart message to subscriber
sendaddress_suc = zmq_send (publisher, subscription, strlen
(subscription), ZMQ_SNDMORE);
send_suc = zmq_send (publisher, "Ende", 4, ZMQ_SNDMORE);
send_data = zmq_send (publisher, (void*) &data, sizeof (data), 0);
//wait till all data has been received by subscriber
WaitForSingleObject(messagesreceived, INFINITE);
//close pub and requester/dealer
status |= zmq_close (publisher);
publisher = NULL;
status |= zmq_close (requester);
requester = NULL;
//destroy context
zmq_ctx_term(context);
//flush log data and close file
FlushLogFile();
CloseLogFile();
return 0;
}
void StartSubscriber(void* subscriberarguments)
{
#ifdef SINGLE_CORE
SetThreadAffinityMask(GetCurrentThread(), 0x70);
#endif
#ifdef THREAD_PRIO_CRITICAL
SetThreadPriority(GetCurrentThread(),
THREAD_PRIORITY_TIME_CRITICAL);
#endif
//SetThreadPriorityBoost(GetCurrentThread(), false);
subscriberargs *subscriptiondata = (subscriberargs *)
subscriberarguments;
void* subscriber = zmq_socket (subscriptiondata->context, ZMQ_SUB
);
int linger = 0, buffsize = 20000;
int HighWaterMark = 1; //Set high water mark to 0
(unlimited) for inbound messages (default 1000)
//Set Linger
int zmqstatus = zmq_setsockopt ( subscriber, ZMQ_LINGER, &linger,
sizeof (linger));
//set the subscription
zmqstatus |= zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,
subscriptiondata->subscription, strlen(subscriptiondata->subscription));
zmqstatus |= zmq_connect (subscriber, "inproc://ZMQ_TEST_PUB_SUB"
);
SetEvent(subscriberdone);
bool endsubsciber = false;
while (!endsubsciber)
{
void* data = 0;
char *receivedsubscription = ZmqReceiveString(subscriber);
//if nothing received or false received
if(receivedsubscription == NULL ||
strcmp(receivedsubscription, subscriptiondata->subscription) != 0)
{
//free Ad
free(receivedsubscription);
break;
}
//then the command
char *command = ZmqReceiveString(subscriber);
//check if it is the last message
if(strcmp(command, "Ende") == 0)
endsubsciber = true;
//Check if one more message is coming
if(ZmqHasMore(subscriber))
{
//receive the last frame
data = ZmqReceiveData(subscriber);
}
//free memory
free(receivedsubscription);
free(command);
free(data);
}
//close the sub socket
zmqstatus |= zmq_close (subscriber);
subscriber = NULL;
//tell main thread that subscriber is closed
SetEvent(messagesreceived);
return;
}
void StartReplier(void* replierarguments)
{
#ifdef SINGLE_CORE
SetThreadAffinityMask(GetCurrentThread(), 0x60);
#endif
#ifdef THREAD_PRIO_CRITICAL
SetThreadPriority(GetCurrentThread(),
THREAD_PRIORITY_TIME_CRITICAL);
#endif
//SetThreadPriorityBoost(GetCurrentThread(), false);
//create replier/router socket
void* replier = zmq_socket (replierarguments, ZMQ_ROUTER);
int linger = 0, buffsize = 20000;
//set socket options
int zmqstatus = zmq_setsockopt ( replier, ZMQ_LINGER, &linger,
sizeof (linger));
zmqstatus |= zmq_setsockopt ( replier, ZMQ_RCVBUF, &buffsize,
sizeof (buffsize));
zmqstatus |= zmq_setsockopt ( replier, ZMQ_SNDBUF, &buffsize,
sizeof (buffsize));
//bind replier socket
zmqstatus |= zmq_bind (replier, "inproc://ZMQ_TEST_ROUTER_DEALER"
);
//tell main thread replier is set
SetEvent(replierdone);
int counter = 0;
//run for 500 messages
while (counter < numresults/3)
{
void* Data = 0;
//first get identity
#ifdef SPINNING_RECEIVE
char *identity = ZmqSpinningReceiveString(replier);
#else
char *identity = ZmqReceiveString(replier);
#endif
//if nothing received or false received
if(identity == NULL)
{
free(identity);
break;
}
//free the envelope
#ifdef SPINNING_RECEIVE
free(ZmqSpinningReceiveString(replier));
#else
free(ZmqReceiveString(replier));
#endif
//then get the command
#ifdef SPINNING_RECEIVE
char *command = ZmqSpinningReceiveString(replier);
#else
char *command = ZmqReceiveString(replier);
#endif
//check is there is more
if(ZmqHasMore(replier))
{
//receive final message
#ifdef SPINNING_RECEIVE
Data = ZmqSpinningReceiveData(replier);
#else
Data = ZmqReceiveData(replier);
#endif
}
//Sleep(1);
//send the reply
zmq_send(replier, "Identity", strlen("Identity"),
ZMQ_SNDMORE);
zmq_send(replier, "", 0, ZMQ_SNDMORE);
zmq_send(replier, "ReceiveComplete", 15, 0);
//Free memory
free(identity);
free(command);
free(Data);
//counts the messages received
counter++;
}
//close replier
zmqstatus |= zmq_close (replier);
replier = NULL;
return;
}
void* ZmqSpinningReceiveData (void *socket) {
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
//assert (rc == 0);
rc = -1;
while( rc == -1 )
{
rc = zmq_msg_recv (&msg, socket, ZMQ_DONTWAIT);
//Sleep(0);
}
if(rc==-1)
{
zmq_msg_close (&msg);
return NULL;
}
void *rcvdata = zmq_msg_data(&msg);
void* buffer = malloc( rc );
memcpy_s( buffer, rc, rcvdata, rc);
zmq_msg_close (&msg);
return buffer;
}
void* ZmqReceiveData (void *socket) {
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
//assert (rc == 0);
/* Block until a message is available to be received from socket
*/
rc = zmq_msg_recv (&msg, socket, 0);
if(rc==-1)
{
zmq_msg_close (&msg);
return NULL;
}
void *rcvdata = zmq_msg_data(&msg);
void* buffer = malloc( rc );
memcpy_s( buffer, rc, rcvdata, rc);
zmq_msg_close (&msg);
return buffer;
}
char* ZmqSpinningReceiveString (void *socket) {
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
/* Block until a message is available to be received from socket
*/
rc = -1;
while( rc == -1 )
{
rc = zmq_msg_recv (&msg, socket, ZMQ_DONTWAIT);
//Sleep(0);
}
//assert (rc != -1);
if(rc==-1)
{
zmq_msg_close (&msg);
//Wait some time to have a look at the message before .exe
gets closed
return NULL;
}
void* rcvdata = zmq_msg_data(&msg);
char* buffer = (char*) malloc( rc + 1 );
memcpy_s(buffer, rc, rcvdata, rc);
buffer[rc] = '\0';
zmq_msg_close (&msg);
return buffer;
}
char* ZmqReceiveString (void *socket) {
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
/* Block until a message is available to be received from socket
*/
rc = zmq_msg_recv (&msg, socket, 0);
//assert (rc != -1);
if(rc==-1)
{
zmq_msg_close (&msg);
//Wait some time to have a look at the message before .exe
gets closed
return NULL;
}
void* rcvdata = zmq_msg_data(&msg);
char* buffer = (char*) malloc( rc + 1 );
memcpy_s(buffer, rc, rcvdata, rc);
buffer[rc] = '\0';
zmq_msg_close (&msg);
return buffer;
}
bool ZmqHasMore (void *socket)
{
int64_t more; // Multipart detection
more = 0;
size_t more_size = sizeof (more);
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
if (!more)
return false;
else
return true;
}
Von: Pieter Hintjens <[email protected]>
An: ZeroMQ development list <[email protected]>,
Datum: 10.10.2015 01:45
Betreff: Re: [zeromq-dev] Bad ZMQ latency on first 500 messages per
connection
Gesendet von: [email protected]
On Fri, Oct 9, 2015 at 11:02 AM, <[email protected]>
wrote:
> Our findings so far:
It would be really interesting and useful to other people if you could
write your experiences up in a blog or article somewhere...
> 2. There are still spikes every about 100 ms, which is annoying. Has
> somebody an idea where this comes from?
It could be Linux kernel behavior. You can try with a real time kernel.
-Pieter
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev