Paul,
With messages being sent one way, via pub and sub sockets, i am getting a
very decent performance. About 80% of our network gets saturated.
The code is zserver.cpp and zclient.cpp
But if i configure the software such that client only sends the next
message, after it has received a response from the server, the throughput
is really bad.
The code is zserver-ack1.cpp and zclient-ack1.cpp
The difference is that in the former case, i can get 110k messages per
second , whereas in the latter case, i can only get 1k messages per second.
The sockets that i use in latter case are of type REQ and REP. Am i using
wrong sockets type ?
On Tue, Sep 18, 2012 at 9:38 PM, Maninder Batth <[email protected]>wrote:
> Paul,
> Thank you again for your help. Now with message copying, i am getting a
> throughput of .8Gb, which is what i would have expected on a 1Gb network.
>
>
> On Tue, Sep 18, 2012 at 4:20 PM, Paul Colomiets <[email protected]>wrote:
>
>> Hi Maninder,
>>
>> On Tue, Sep 18, 2012 at 10:21 PM, Maninder Batth
>> <[email protected]> wrote:
>> > Paul,
>> > Here is number of messages as seen by the server in one second. Each
>> message
>> > is 1024 byte excluding tcp/ip and zmq headers. Based on these numbers
>> and i
>> > am getting a throughput of 1.4 Gb/sec.
>> > Enclosed is the source code for the server and the client.
>> >
>>
>> Zeromq closes the message after sending. So you effectively send
>> messages of the zero length after first one. You should use
>> zmq_msg_copy (or whatever C++ API is there) before doing send() in
>> case you want to reuse message.
>>
>> --
>> Paul
>> _______________________________________________
>> zeromq-dev mailing list
>> [email protected]
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>
>
>
#include <string>
#include <iostream>
#include "zmq.hpp"
#include "boost/chrono.hpp"
typedef boost::chrono::duration<long, boost::milli> milliseconds;
std::string * makeString(int i);
int main()
{
zmq::context_t context(1);
std::cout << "About to create a socket" << std::endl;
zmq::socket_t socket(context, ZMQ_PUB);
std::cout << "Socket created " << std::endl;
long numOfMsgs =0;
int size = 1024;
std::string *s = makeString(size);
std::cout << "Message is " << std::endl;
std::cout << *s << std::endl;
std::cout << "String length is " << s->length() << std::endl;
zmq::message_t * request1 = new zmq::message_t(size);
memcpy((void *) request1->data(),(void *) s, 1024);
socket.connect("tcp://173.229.133.114:55555");
boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now();
milliseconds ms(1000);
boost::chrono::system_clock::time_point end = start + ms;
zmq::message_t * request = new zmq::message_t(size);
while(true)
{
do
{
request->copy(request1);
bool success = socket.send(*request);
if (success)
numOfMsgs++;
} while(boost::chrono::system_clock::now() < end);
std::cout << "Number of messages send in one sec: " << numOfMsgs << std::endl;
end = boost::chrono::system_clock::now() + ms;
numOfMsgs = 0;
}
return 0;
}
std::string * makeString(int i)
{
char data[1024];
for(int j=0;j<i;j++)
{
data[j] = 'a';
}
std::string *a = new std::string(data, 1024);
return a;
}
#include <string>
#include <iostream>
#include "zmq.hpp"
#include "boost/chrono.hpp"
typedef boost::chrono::duration<long, boost::milli> milliseconds;
std::string * makeString(int i);
int main()
{
zmq::context_t context(1);
std::cout << "About to create a socket" << std::endl;
zmq::socket_t socket(context, ZMQ_REQ);
std::cout << "Socket created " << std::endl;
long numOfMsgs =0;
int size = 1024;
std::string *s = makeString(size);
std::cout << "Message is " << std::endl;
std::cout << *s << std::endl;
std::cout << "String length is " << s->length() << std::endl;
zmq::message_t *request1 = new zmq::message_t(1024);
memcpy((void *) request1->data(),(void *) s, 1024);
socket.connect("tcp://173.229.133.114:55555");
boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now();
milliseconds ms(1000);
boost::chrono::system_clock::time_point end = start + ms;
zmq::message_t *request = new zmq::message_t(1024);
while(true)
{
do
{
request->copy(request1);
socket.send(*request);
zmq::message_t response;
socket.recv(&response);
numOfMsgs++;
} while(boost::chrono::system_clock::now() < end);
std::cout << "Number of messages send in ten sec: " << numOfMsgs << std::endl;
end = boost::chrono::system_clock::now() + ms;
numOfMsgs = 0;
}
return 0;
}
std::string * makeString(int i)
{
char data[1024];
for(int j=0;j<i;j++)
{
data[j] = 'a';
}
std::string *a = new std::string(data, 1024);
return a;
}
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>
#include "boost/chrono.hpp"
typedef boost::chrono::duration<long, boost::milli> milliseconds;
typedef boost::chrono::system_clock::time_point time_point;
using namespace std;
void handleTimeout(long & numOfMsgs, time_point & timeout);
time_point now();
bool isTimeout(time_point earlier);
time_point getNewTimeout();
milliseconds ms(1000); //one second
int main()
{
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_SUB);
socket.setsockopt(ZMQ_SUBSCRIBE,"", strlen(""));
socket.bind("tcp://173.229.133.114:55555");
long numOfMsgs = 0;
time_point timeout = getNewTimeout();
do {
zmq::message_t request;
socket.recv(&request);
// cout << "Msg recvd" << endl;
if (isTimeout(timeout))
handleTimeout(numOfMsgs, timeout);
else
numOfMsgs++;
} while (true);
return 0;
}
void handleTimeout(long & numOfMsgs, time_point & timeout)
{
cout << "Number of msgs in one second: " << numOfMsgs++ << endl;
numOfMsgs = 1;
timeout = getNewTimeout();
}
time_point now()
{
return boost::chrono::system_clock::now();
}
bool isTimeout(time_point timeout)
{
return ((now() < timeout) ? false : true);
}
time_point getNewTimeout()
{
return (now() + ms);
}
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>
#include "boost/chrono.hpp"
typedef boost::chrono::duration<long, boost::milli> milliseconds;
typedef boost::chrono::system_clock::time_point time_point;
using namespace std;
void handleTimeout(long & numOfMsgs, time_point & timeout);
time_point now();
bool isTimeout(time_point earlier);
time_point getNewTimeout();
milliseconds ms(1000); //ten seconds
int main()
{
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REP);
//socket.setsockopt(ZMQ_SUBSCRIBE,"", strlen(""));
socket.bind("tcp://173.229.133.114:55555");
long numOfMsgs = 0;
zmq::message_t *response1 = new zmq::message_t(5);
memcpy((void *) response1->data(), "World", 5);
zmq::message_t *response = new zmq::message_t(5);
time_point timeout = getNewTimeout();
do {
zmq::message_t request;
socket.recv(&request);
if (isTimeout(timeout))
handleTimeout(numOfMsgs, timeout);
else
numOfMsgs++;
response->copy(response1);
socket.send(*response);
} while (true);
return 0;
}
void handleTimeout(long & numOfMsgs, time_point & timeout)
{
cout << "Number of msgs in ten seconds: " << numOfMsgs++ << endl;
numOfMsgs = 1;
timeout = getNewTimeout();
}
time_point now()
{
return boost::chrono::system_clock::now();
}
bool isTimeout(time_point timeout)
{
return ((now() < timeout) ? false : true);
}
time_point getNewTimeout()
{
return (now() + ms);
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev