Hi Martin,
Thanks for your response.
The problem is reproducible.
Attaching the pub code and the sub code, and the batch file which
operates both (attached as pubsub.txt, please rename to pubsub.bat).
please call "pubsub.bat 10 1000"
This will generated one publisher which sends 1000 messages to 10 subscribers.
When each subscriber gets the whole 1000 messages it will dump the
results to a file (<subscriber_number>.tda).
The left most column is the qpc of the message when was sent , and the
one to the right is the qpc when was received.
The difference between them is how time did it take the message to
arrive. you can see that the difference (on every tda file) in the
last message (1000th) is ~20 times bigger than the rest.
Thanks,
Omer.
On Wed, Oct 12, 2011 at 8:57 AM, Martin Sustrik <[email protected]> wrote:
> Hi Omer,
>
>> I'm testing a scenario in which I have one publisher sending messages
>> to multiple subscribers.
>> In the message the publisher sends I'm sending the value of
>> QueryPerformanceCounter and when the subscriber gets a message it
>> calculates the difference between the current QueryPerformanceCounter
>> to the one stored in the message.
>>
>> It seems that for 1000 messages sent from the publisher to the
>> subscribers, the last one takes around 20 times than the average time
>> it takes for the previous ones to arrive.
>> Seems as the last one is "stuck" and in some queue and is released
>> probably after a timeout.
>
> There's no flush timeout in 0MQ code, so the delay is either a bug in 0MQ or
> a side effect of how latency is measured.
>
> Is the problem reproducible? If so, can you provide the test program?
>
> Martin
>
--
Omer Bacharach
Director of Software Development
Toot Trading Ltd.
[email protected]
Mobile +972-544-317959
Office +972-3-7449466
@Set Counter=1
:loop
@IF %Counter% LEQ %1 goto sub
:pub
@start pub.exe %2
@goto END
:sub
@start "Subscriber %Counter%" sub.exe %2
@Set /A Counter+=1
@goto loop
:END
// sub.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
//
// Weather update client in C++
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
//
// Olivier Chamoux <[email protected]>
//
#include <zmq.hpp>
#include <iostream>
#include <sstream>
int main (int argc, char *argv[])
{
struct INPUT{
int i;
char msg[10];
LARGE_INTEGER t;
};
if (argc!=3)
return -1;
int repetitions = atoi(argv[1]);
int subCount = atoi(argv[2]);
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5556");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);
long long max = 0;
long long min = 999999999;
long long total = 0;
DWORD tstart, tend, tdif;
bool first = true;
int toRemember = 0;
std::vector<std::pair<long long,long long>> dump;
for (int update_nbr = 0; update_nbr < repetitions; update_nbr++)
{
INPUT input;
zmq::message_t update(sizeof(input));
subscriber.recv(&update);
char* m = (char*)update.data();
memcpy(&input,m,sizeof(input));
LARGE_INTEGER now;
QueryPerformanceCounter(&now);
dump.push_back(std::pair<long long,long long>(now.QuadPart,input.t.QuadPart));
/*
long long delta = now.QuadPart - input.t.QuadPart;
if (delta > max)
max = delta;
if (delta < min )
min = delta;
total += delta;
if (first)
{
tstart = GetTickCount();
first = false;
}
toRemember = update_nbr;
//std::cout<<"receiving : "<<input.i<<" , "<<input.t<<std::endl;
*/
}
/*
tend = GetTickCount();
tdif = tend - tstart; //will now have the time elapsed since the start of the call
std::cout <<std::endl<<"Number of messages :"<<std::endl<< toRemember+1<<std::endl;
std::cout <<std::endl<<"Duration :"<<std::endl<< tdif<<std::endl;
std::cout <<std::endl<<"Max Delta :"<<std::endl<< max <<std::endl;
std::cout <<std::endl<<"Min Delta :"<<std::endl<< min <<std::endl;
std::cout <<std::endl<<"Total :"<<std::endl<< total <<std::endl;
std::cout <<std::endl<<"Repetitions :"<<std::endl<< repetitions <<std::endl;
std::cout <<std::endl<<"Avg Delta :"<<std::endl<< total/repetitions <<std::endl;
std::cout <<std::endl<<std::endl<<"Hit a key to exit"<<std::endl;
int j;
std::cin>>j;
*/
char path[2000] = "";
sprintf(path,"Sub_%d.tda",subCount);
FILE* f = fopen(path,"w");
//dump to file
for (auto itr = dump.begin();itr!=dump.end();itr++)
{
auto ReceivedQpc = itr->first;
auto SentQpc = itr->second;
fprintf(f,"%lld\t%lld\n",SentQpc,ReceivedQpc);
}
fclose(f);
return 0;
}//
// Weather update server in C++
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
// Olivier Chamoux <[email protected]>
//
#include "stdafx.h"
#include <zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <ctime>
#include <cstdlib>
#include <iostream>
int main (int argc, char *argv[]) {
struct INPUT{
int i;
char msg[10];
LARGE_INTEGER t;
};
// Prepare our context and publisher
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://*:5556");
/*
Adding a sleep in the publisher between binding a port and sending
the messages. It looks like it's still connecting the subscriber
while the first message is being published.
*/
Sleep(2000);
int i = 0;
if (argc!=2)
return -1;
int repetitions = atoi(argv[1]);
srand((unsigned)time(0));
int random_integer;
int lowest=10, highest=100;
int range=(highest-lowest)+1;
for (int update_nbr = 0; update_nbr < repetitions; update_nbr++)
{
INPUT m;
zmq::message_t request (sizeof(m));
/*
time_t seconds;
seconds = time (NULL);
m.t = seconds;
*/
random_integer = lowest+int(range*rand()/(RAND_MAX + 1.0));
Sleep(random_integer);
QueryPerformanceCounter(&m.t);
strcpy(m.msg,"message");
m.i = i;
memcpy ((void *) request.data (), (char*)&m,sizeof(m));
//random_integer = lowest+int(range*rand()/(RAND_MAX + 1.0));
//Sleep(random_integer);
publisher.send(request);
//std::cout<<"sending : "<<m.i<<" , "<<m.t<<std::endl;
i++;
}
return 0;
}_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev