Hi Pieter,
Please find attached the test code.
*Test Description: *
send_process: - Gets the time before sending a message and sends it as part
of the message.
- Message send continuously with a delay of 1 ms.
- Uses ZMQ_PUB socket and IPC protocol.
recv_process: - After receiving the message, calculates the time
difference between send and receive times.
- If the difference is greater than 10 ms, prints
the message number and delay info(In the actual test code I am sending to
logging application using ZMQ_PUB and TCP protocol)
- Uses ZMQ_SUB and IPC protocol.
*Test ran for more than 48hrs.*
*NOTE: Message delay will be seen only after running the test for a couple
of hours. (in my case after 10 hrs)*
*Test Configuration:*
- Freescale quad core ARM processor.
- 1 GHz processor
- OS : Ubuntu 10.04
Thanks,
Divya
On Mon, Apr 15, 2013 at 12:52 PM, Pieter Hintjens <[email protected]> wrote:
> Hi Asif,
>
> I'm hoping Divya will provide us with reusable tests that show the problem.
>
> -Pieter
>
>
>
>
>
> On Mon, Apr 15, 2013 at 7:50 AM, asif saeed <[email protected]> wrote:
> > Excellent, Divya. Very good analysis.
> >
> > @Pieter,
> >
> > I am also going to use ZeroMQ in an Ubuntu environment and Divya's
> analysis
> > is really worrying me. Does ZeroMQ have benchmarks on non real-time
> systems?
> > What kind of performance tests are there to test the performance?
> >
> > Sincerely,
> >
> > -Asif
> >
> >
> > On Fri, Apr 12, 2013 at 10:29 PM, Divya Mohan <[email protected]>
> > wrote:
> >>
> >> Hi Pieter,
> >>
> >> I tried the same performance tests with POSIX message queues and there
> was
> >> no delay in receiving the messages. The same test with ZeroMQ showed
> delays
> >> of upto 470msec. Doesn't this mean that we can rule out the possibility
> of
> >> another process taking up CPU time randomly (as you had suggested
> earlier)?
> >>
> >> Are there any sleep/wait (for semaphores) in the zeromq code that could
> >> send my process to suspeneded/sleep state. I am not able to figure out
> why
> >> this delay is seen when using ZeroMq and not with message queues.
> >>
> >>
> >>
> >> Thanks,
> >>
> >> Divya
> >>
> >>
> >>
> >>
> >>
> >> On Thu, Mar 28, 2013 at 3:54 PM, Pieter Hintjens <[email protected]> wrote:
> >>>
> >>> On Wed, Mar 27, 2013 at 4:51 PM, Divya Mohan <[email protected]
> >
> >>> wrote:
> >>>
> >>> > Current and default scheduling policy is SCHED_OTHER. I had given
> >>> > highest
> >>> > priority to my receive process thinking delay due to scheduling will
> be
> >>> > taken care off but even then the messages are delayed.
> >>> >
> >>> > Also I am not using any sleep in my receive process. Only send
> process
> >>> > has
> >>> > a sleep of 1 msec.
> >>>
> >>> In general changing thread priorities is a bad idea.
> >>>
> >>> Perhaps you have another process taking CPU time at random occasions,
> >>> causing the sender I/O thread to be swapped out.
> >>>
> >>> If you want guarantees that there are no spikes you'll have to use a
> >>> real time kernel. It will increase average latency but remove the
> >>> spikes.
> >>>
> >>> -Pieter
> >>> _______________________________________________
> >>> zeromq-dev mailing list
> >>> [email protected]
> >>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> >>
> >>
> >>
> >> _______________________________________________
> >> zeromq-dev mailing list
> >> [email protected]
> >> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> >>
> >
> >
> > _______________________________________________
> > zeromq-dev mailing list
> > [email protected]
> > http://lists.zeromq.org/mailman/listinfo/zeromq-dev
> >
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
#include<stdlib.h>
#include<pthread.h>
#include<string.h>
#include<stdio.h>
#include<unistd.h>
#include<sys/time.h>
#include<sys/resource.h>
#include"zmq/zmq.h"
void *process_context;
void *publish_socket;
void *sub_socket;
pthread_t thread_id[2];
typedef struct
{
unsigned int msg_no;
int time_in_sec;
int time_in_usec;
}MessageDetails;
/*
* Recieves bcast messages from other process.
*/
void* RecvBcastMesg (void *arg)
{
int return_code = -1;
unsigned int missed_count = 0;
static unsigned int last_recvd_count= 0;
MessageDetails my_msg_details;
struct timeval tv;
struct timeval s_time;
struct timezone tz;
double recv_time;
double send_time;
unsigned int delay_time;
unsigned int time_delayed;
while(1)
{
memset(&my_msg_details, 0, sizeof( my_msg_details ));
return_code = zmq_recv(sub_socket,&my_msg_details,sizeof( my_msg_details ),0);
gettimeofday(&tv,&tz);
if (return_code > 0)
{
s_time.tv_sec = my_msg_details.time_in_sec;
s_time.tv_usec = my_msg_details.time_in_usec;
recv_time = (double)tv.tv_sec + ((double)tv.tv_usec)/ (1000 * 1000);
send_time = (double)s_time.tv_sec + ((double) s_time.tv_usec) /(1000 * 1000);
delay_time = (unsigned int) ((recv_time- send_time) * 1000 * 1000) ;
if(delay_time > 10000) //10msec
{
time_delayed = delay_time - 10000;
printf("MsgNo: %d delayed by time:%dusecs\n",my_msg_details.msg_no, time_delayed);
//In real test, a function is called to send this info to a logging process using ZMQ PUB/SUB- TCP protocol.
}
}//end if
}//end while
}
int main()
{
int return_code = -1;
int filter_len = 0;
char* filter_value = NULL;
process_context = zmq_ctx_new();
if (NULL == process_context)
{
printf("Context Creation Error\n");
}
sub_socket = zmq_socket(process_context, ZMQ_SUB);
if (NULL == sub_socket)
{
return_code= zmq_errno();
printf("Socket Open error:%d:%s\n",return_code,zmq_strerror(return_code));
}
//subscriber socket open check
return_code= zmq_connect(sub_socket,"ipc://BcastProc3.ipc");
if (-1 == return_code)
{
return_code= zmq_errno();
printf("Socket Connect error:%d:%s\n",return_code,zmq_strerror(return_code));
}
//subscriber socket connect check
return_code= zmq_setsockopt(sub_socket,ZMQ_SUBSCRIBE,NULL,0);
if (-1 == return_code)
{
return_code= zmq_errno();
printf("Socket opt error:%d:%s\n",return_code,zmq_strerror(return_code));
}
//set socket option check
publish_socket = zmq_socket(process_context, ZMQ_PUB);
if (NULL == publish_socket)
{
return_code= zmq_errno();
printf("Socket Open error:%d:%s\n",return_code,zmq_strerror(return_code));
} //publisher socket open check
return_code = pthread_create(&thread_id[0],NULL,&RecvBcastMesg,NULL);
if(0 != return_code)
{
printf( "Thread Creation-RecvBcastMesg: %d", return_code);
}
//MainProcess should wait till threads are complete
return_code = pthread_join(thread_id[0], NULL);
if(0 != return_code)
{
printf( "Thread Join-RecvBcastMesg: %d", return_code);
}
return_code= zmq_close(sub_socket);;
if (0!= return_code)
{
printf( "Sub-Socket close Error : %d", return_code);
}
return_code= zmq_close(publish_socket);;
if (0 != return_code)
{
printf("PublishSocket close Error : %d", return_code);
}
return_code= zmq_ctx_destroy(process_context);
if (0 != return_code)
{
printf( "Context deletion Error : %d", return_code);
}
return return_code;
}//Sending Process
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/time.h>
#include "zmq/zmq.h"
typedef struct
{
unsigned int msg_no;
int time_in_sec;
int time_in_usec;
}MessageDetails;
unsigned int send_count =0;
void *process_context;
void *pub_socket;
static int SendMessage( void )
{
int return_code = -1;
int retry_count = 0;
MessageDetails my_msg_details;
struct timezone tz;
struct timeval send_time;
static unsigned int send_count = 1;
memset(&my_msg_details, 0 , sizeof(my_msg_details) );
my_msg_details.msg_no = send_count++;
usleep(1000); //1msec
gettimeofday(&send_time,&tz);
my_msg_details.time_in_sec = send_time.tv_sec;
my_msg_details.time_in_usec = send_time.tv_usec;
return_code = zmq_send( pub_socket,(unsigned char *)&my_msg_details,sizeof(my_msg_details),0);
if (return_code < 0)
{
printf("Send Error\n");
}
return( return_code );
}
int main()
{
int return_code = -1;
process_context = zmq_ctx_new();
if (NULL == process_context)
{
printf("Context Creation Error\n");
}
pub_socket = zmq_socket(process_context, ZMQ_PUB);
if (NULL == pub_socket)
{
return_code= zmq_errno();
printf("Socket Open error:%d:%s\n",return_code,zmq_strerror(return_code));
}
//publisher socket open check
return_code= zmq_bind(pub_socket,"ipc://BcastProc3.ipc");
if (-1 == return_code)
{
return_code= zmq_errno();
printf("Socket Bind error:%d:%s\n",return_code,zmq_strerror(return_code));
}
//publisher socket bind check
sleep(1);
while ( 1 )
{
SendMessage();
}
return_code= zmq_close(pub_socket);;
if (0 != return_code)
{
printf("PublishSocket close Error : %d", return_code);
}
return_code= zmq_ctx_destroy(process_context);
if (0 != return_code)
{
printf( "Context deletion Error : %d", return_code);
}
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev