Paul,
With messages being sent one way, via pub and sub sockets, i am getting a
very decent performance. About 80% of our network gets saturated.
The code is zserver.cpp and zclient.cpp

But if i configure the software such that client only sends the next
message, after it has received a response from the server, the throughput
 is really bad.
The code is zserver-ack1.cpp  and zclient-ack1.cpp
The difference is that in the former case, i can get 110k messages per
second , whereas in the latter case,  i can only get 1k messages per second.
The sockets that i use in latter case are of type REQ and REP. Am i using
wrong sockets type ?



On Tue, Sep 18, 2012 at 9:38 PM, Maninder Batth <[email protected]>wrote:

> Paul,
> Thank you again for your help. Now with message copying, i am getting a
> throughput of .8Gb, which is what i would have expected on a 1Gb network.
>
>
> On Tue, Sep 18, 2012 at 4:20 PM, Paul Colomiets <[email protected]>wrote:
>
>> Hi Maninder,
>>
>> On Tue, Sep 18, 2012 at 10:21 PM, Maninder Batth
>> <[email protected]> wrote:
>> > Paul,
>> > Here is number of messages as seen by the server in one second. Each
>> message
>> > is 1024 byte excluding tcp/ip and zmq headers. Based on these numbers
>> and i
>> > am getting a throughput of 1.4 Gb/sec.
>> > Enclosed is the source code for the server and the client.
>> >
>>
>> Zeromq closes the message after sending. So you effectively send
>> messages of the zero length after first one. You should use
>> zmq_msg_copy (or whatever C++ API is there) before  doing send() in
>> case you want to reuse message.
>>
>> --
>> Paul
>> _______________________________________________
>> zeromq-dev mailing list
>> [email protected]
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>
>
>
#include <string>
#include <iostream>
#include "zmq.hpp"
#include "boost/chrono.hpp"

typedef boost::chrono::duration<long, boost::milli> milliseconds;

std::string * makeString(int i);

int main()
{
	zmq::context_t context(1);
	std::cout << "About to create a socket" << std::endl;
	zmq::socket_t socket(context, ZMQ_PUB);
	std::cout << "Socket created " << std::endl;

	long numOfMsgs =0; 
	int size = 1024;
	std::string *s = makeString(size);
	std::cout << "Message is " << std::endl;
	std::cout << *s << std::endl;
	std::cout << "String length  is " << s->length() << std::endl;

	zmq::message_t * request1 = new zmq::message_t(size);
	
	memcpy((void *) request1->data(),(void *)  s, 1024);

	socket.connect("tcp://173.229.133.114:55555");
	
	boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now();
	milliseconds ms(1000);
	boost::chrono::system_clock::time_point end = start + ms;
	zmq::message_t * request = new zmq::message_t(size);

	while(true)
	{
	do
	{
		request->copy(request1);
		bool success = socket.send(*request);
		if (success)
			numOfMsgs++;
	} while(boost::chrono::system_clock::now() < end);

	std::cout << "Number of messages send in one sec: " << numOfMsgs << std::endl;
	end = boost::chrono::system_clock::now() + ms;
	numOfMsgs = 0;
	}

	return 0;
}

std::string * makeString(int i)
{
char data[1024];
for(int j=0;j<i;j++)
{
	data[j] = 'a';
}
std::string *a = new std::string(data, 1024);
return a;
}


#include <string>
#include <iostream>
#include "zmq.hpp"
#include "boost/chrono.hpp"

typedef boost::chrono::duration<long, boost::milli> milliseconds;

std::string * makeString(int i);

int main()
{
	zmq::context_t context(1);
	std::cout << "About to create a socket" << std::endl;
	zmq::socket_t socket(context, ZMQ_REQ);
	std::cout << "Socket created " << std::endl;

	long numOfMsgs =0; 
	int size = 1024;
	std::string *s = makeString(size);
	std::cout << "Message is " << std::endl;
	std::cout << *s << std::endl;
	std::cout << "String length  is " << s->length() << std::endl;

	zmq::message_t *request1 = new zmq::message_t(1024);
	
	memcpy((void *) request1->data(),(void *)  s, 1024);

	socket.connect("tcp://173.229.133.114:55555");
	
	boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now();
	milliseconds ms(1000);
	boost::chrono::system_clock::time_point end = start + ms;

	zmq::message_t *request = new zmq::message_t(1024);
	while(true)
	{
	do
	{
		request->copy(request1);
		socket.send(*request);
		zmq::message_t response;
		socket.recv(&response);
		numOfMsgs++;
	} while(boost::chrono::system_clock::now() < end);

	std::cout << "Number of messages send in ten sec: " << numOfMsgs << std::endl;
	end = boost::chrono::system_clock::now() + ms;
	numOfMsgs = 0;
	}

	return 0;
}

std::string * makeString(int i)
{
char data[1024];
for(int j=0;j<i;j++)
{
	data[j] = 'a';
}
std::string *a = new std::string(data, 1024);
return a;
}


#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>
#include "boost/chrono.hpp"

typedef boost::chrono::duration<long, boost::milli> milliseconds;
typedef boost::chrono::system_clock::time_point time_point;

using namespace std;

void handleTimeout(long & numOfMsgs, time_point & timeout);
time_point now();
bool isTimeout(time_point earlier);
time_point getNewTimeout();

milliseconds ms(1000); //one second

int main()
{

zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_SUB);
socket.setsockopt(ZMQ_SUBSCRIBE,"", strlen(""));

socket.bind("tcp://173.229.133.114:55555");
long numOfMsgs = 0;

time_point timeout = getNewTimeout();
do {
	zmq::message_t request;
	socket.recv(&request);
//	cout << "Msg recvd" << endl;
	if (isTimeout(timeout))
		handleTimeout(numOfMsgs, timeout);
	else
		numOfMsgs++;

   } while (true);

return 0;
}

void handleTimeout(long & numOfMsgs, time_point & timeout)
{
cout << "Number of msgs in one second: " << numOfMsgs++ << endl;
numOfMsgs = 1;
timeout = getNewTimeout();
}

time_point now()
{
	return  boost::chrono::system_clock::now();
}

bool isTimeout(time_point timeout)
{
	return ((now() < timeout) ? false : true);
}

time_point getNewTimeout()
{
	return (now() + ms);
}


#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>
#include "boost/chrono.hpp"

typedef boost::chrono::duration<long, boost::milli> milliseconds;
typedef boost::chrono::system_clock::time_point time_point;

using namespace std;

void handleTimeout(long & numOfMsgs, time_point & timeout);
time_point now();
bool isTimeout(time_point earlier);
time_point getNewTimeout();

milliseconds ms(1000); //ten seconds

int main()
{

zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REP);
//socket.setsockopt(ZMQ_SUBSCRIBE,"", strlen(""));

socket.bind("tcp://173.229.133.114:55555");
long numOfMsgs = 0;

zmq::message_t *response1 = new zmq::message_t(5);
memcpy((void *) response1->data(), "World", 5);

zmq::message_t *response = new zmq::message_t(5);

time_point timeout = getNewTimeout();
do {
	zmq::message_t request;

	socket.recv(&request);
	if (isTimeout(timeout))
		handleTimeout(numOfMsgs, timeout);
	else
		numOfMsgs++;
	response->copy(response1);
	socket.send(*response);	

   } while (true);

return 0;
}

void handleTimeout(long & numOfMsgs, time_point & timeout)
{
cout << "Number of msgs in ten seconds: " << numOfMsgs++ << endl;
numOfMsgs = 1;
timeout = getNewTimeout();
}

time_point now()
{
	return  boost::chrono::system_clock::now();
}

bool isTimeout(time_point timeout)
{
	return ((now() < timeout) ? false : true);
}

time_point getNewTimeout()
{
	return (now() + ms);
}


_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to