Re: [zeromq-dev] Only first X messages received with PGM/0MQ c++ setup

2018-04-16 Thread Luca Boccassi
On Mon, 2018-04-16 at 18:25 +0200, Johnny Berentsen wrote:
> 2018-04-16 14:35 GMT+02:00 Luca Boccassi :
> 
> > On Mon, 2018-04-16 at 13:54 +0200, Johnny Berentsen wrote:
> > > 2018-04-02 15:30 GMT+02:00 Johnny Berentsen  > > il.c
> > > om>:
> > > 
> > > > Hi all
> > > > 
> > > > 
> > > 
> > > Now I have a vector with size 2500 of std::strings filled with
> > > 4096 #
> > > characters. Every ten seconds I loop through the vector and send
> > > the
> > > data
> > > over a EPGM socket. If I go full throttle, only the first 1000
> > > messages are
> > > received, but if I add a usleep(1) after each send, all messages
> > > are
> > > received.
> > 
> > Default high water mark is 1000 messages, most likely the receiving
> > side is too slow to process and so it starts dropping messages
> > 
> > 
> 
> You're absolutely right. I was put off by the documentation that
> said:
> The default *ZMQ_SNDHWM* value of zero means "no limit".
> 
> Shame on me for not trying though :/
> 
> I understand that setting the HWM is different depending on the usage
> pattern. However, I can't find much information on the different
> criterias
> that should be considered.
> I've come up with these:
> Typical and max packet size, network restrictions, number of packets,
> available memory, CPU
> 
> My application will run on a LAN on modern PC's with >=8GiB memory,
> up to a
> few kB's per message and only a burst of a few thousand messages once
> in a
> while. What value is reasonable in such a case?

It mostly depend on your application and how quickly it can pull
messages off the socket. So the only way is to try and test it.

> I also tested with checking the return value of send() but it
> returned true
> regardless of sending or not. Are there other ways of being noticed
> that
> messages have been dropped?

No, that's the whole point about pub-sub - if you don't want a lossy
mechanism then you should use a different pattern.
Or you can set unlimited water marks and deal with the unbounded memory
usage.

-- 
Kind regards,
Luca Boccassi

signature.asc
Description: This is a digitally signed message part
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev


Re: [zeromq-dev] Only first X messages received with PGM/0MQ c++ setup

2018-04-16 Thread Johnny Berentsen
2018-04-16 14:35 GMT+02:00 Luca Boccassi :

> On Mon, 2018-04-16 at 13:54 +0200, Johnny Berentsen wrote:
> > 2018-04-02 15:30 GMT+02:00 Johnny Berentsen  > om>:
> >
> > > Hi all
> > > 
> > >
> >
> > Now I have a vector with size 2500 of std::strings filled with 4096 #
> > characters. Every ten seconds I loop through the vector and send the
> > data
> > over a EPGM socket. If I go full throttle, only the first 1000
> > messages are
> > received, but if I add a usleep(1) after each send, all messages are
> > received.
>
> Default high water mark is 1000 messages, most likely the receiving
> side is too slow to process and so it starts dropping messages
>
>
You're absolutely right. I was put off by the documentation that said:
The default *ZMQ_SNDHWM* value of zero means "no limit".

Shame on me for not trying though :/

I understand that setting the HWM is different depending on the usage
pattern. However, I can't find much information on the different criterias
that should be considered.
I've come up with these:
Typical and max packet size, network restrictions, number of packets,
available memory, CPU

My application will run on a LAN on modern PC's with >=8GiB memory, up to a
few kB's per message and only a burst of a few thousand messages once in a
while. What value is reasonable in such a case?
I also tested with checking the return value of send() but it returned true
regardless of sending or not. Are there other ways of being noticed that
messages have been dropped?

JC
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev


Re: [zeromq-dev] Only first X messages received with PGM/0MQ c++ setup

2018-04-16 Thread Luca Boccassi
On Mon, 2018-04-16 at 13:54 +0200, Johnny Berentsen wrote:
> 2018-04-02 15:30 GMT+02:00 Johnny Berentsen  om>:
> 
> > Hi all
> > 
> > 
> 
> So I made a small example showing my problem. Attached server.cc and
> client.cc at the end.
> Now I have a vector with size 2500 of std::strings filled with 4096 #
> characters. Every ten seconds I loop through the vector and send the
> data
> over a EPGM socket. If I go full throttle, only the first 1000
> messages are
> received, but if I add a usleep(1) after each send, all messages are
> received.
> I'm assuming there's something really simple missing in my setup, but
> I am
> unable to figure it out.

Default high water mark is 1000 messages, most likely the receiving
side is too slow to process and so it starts dropping messages

> server.cc:
> 
> #include 
> #include 
> #include 
> #include 
> 
> int main(int argc, char *argv[])
> {
> std::stringstream connstr;
> connstr << "epgm://";
> if( argc > 0 )   connstr << argv[1] << ";239.10.10.10:5";
> else { std::cout << "Missing argument" << std::endl;
> exit(1); }
> 
> std::cout << "Initializing network to " << connstr.str() <<
> std::endl;
> zmq::context_t *context   = new zmq::context_t(1);
> zmq::socket_t  *fplsocket = new zmq::socket_t(*context, ZMQ_PUB);
> const int rate = 100;  // 1Gb TX-
> and
> RX- rate
> fplsocket->setsockopt(ZMQ_RATE, , sizeof(rate));
> fplsocket->bind(connstr.str());
> std::cout << "Network initialized" << std::endl;
> 
> std::vector blobs;
> int len=4096;
> for( auto i=0 ; i < 2500 ; i++ )
> {
> std::string str(len, '#');
> blobs.push_back(str);
> }
> 
> while( 1 )
> {
> sleep(10);
> std::cout << "Sending " << blobs.size() << " messages" <<
> std::endl;
> for( std::vector::iterator it = blobs.begin() ;
> it !=
> blobs.end() ; it++ )
> {
> zmq::message_t fplmsg(it->length());
> memcpy(fplmsg.data(), it->data(), it->length());
> fplsocket->send(fplmsg);
> // usleep(1);
> }
> }
> }
> 
> client.cc:
> ---
> #include 
> #include 
> #include 
> #include 
> 
> int main(int argc, char *argv[])
> {
> std::stringstream connstr;
> connstr << "epgm://";
> if( argc > 0 )   connstr << argv[1] << ";239.10.10.10:5";
> else { std::cout << "Missing argument" << std::endl;
> exit(1); }
> 
> std::cout << "Initializing network to " << connstr.str() <<
> std::endl;
> zmq::context_t *context   = new zmq::context_t(1);
> zmq::socket_t  *fplsocket = new zmq::socket_t(*context, ZMQ_SUB);
> fplsocket->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
> fplsocket->connect(connstr.str());
> std::cout << "Network initialized" << std::endl;
> 
> int i=0;
> int rc;
> zmq::message_t resultset;
> while( 1 )
> {
> if( (rc = fplsocket->recv()) == true)
> {
> i++;
> std::string msg_str(static_cast(resultset.data()),
> resultset.size());
> std::cout << "Received msg " << i << " with size: " <<
> msg_str.length() << std::endl;
> }
> }
> }
> 
> Compilation:
> g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC
> -DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o server.o
> server.cc
> g++ -m64 -o server server.o -lzmq -lpgm -lprotobuf
> g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC
> -DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o client.o
> client.cc
> g++ -m64 -o client client.o -lzmq -lpgm -lprotobuf
> 
> # Start on machine 1
> ./server 
> # Start on machine 2
> ./client 
> ___
> zeromq-dev mailing list
> zeromq-dev@lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev

-- 
Kind regards,
Luca Boccassi

signature.asc
Description: This is a digitally signed message part
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev


Re: [zeromq-dev] Only first X messages received with PGM/0MQ c++ setup

2018-04-16 Thread Johnny Berentsen
2018-04-02 15:30 GMT+02:00 Johnny Berentsen :

> Hi all
> 
>

So I made a small example showing my problem. Attached server.cc and
client.cc at the end.
Now I have a vector with size 2500 of std::strings filled with 4096 #
characters. Every ten seconds I loop through the vector and send the data
over a EPGM socket. If I go full throttle, only the first 1000 messages are
received, but if I add a usleep(1) after each send, all messages are
received.
I'm assuming there's something really simple missing in my setup, but I am
unable to figure it out.

server.cc:

#include 
#include 
#include 
#include 

int main(int argc, char *argv[])
{
std::stringstream connstr;
connstr << "epgm://";
if( argc > 0 )   connstr << argv[1] << ";239.10.10.10:5";
else { std::cout << "Missing argument" << std::endl;
exit(1); }

std::cout << "Initializing network to " << connstr.str() << std::endl;
zmq::context_t *context   = new zmq::context_t(1);
zmq::socket_t  *fplsocket = new zmq::socket_t(*context, ZMQ_PUB);
const int rate = 100;  // 1Gb TX- and
RX- rate
fplsocket->setsockopt(ZMQ_RATE, , sizeof(rate));
fplsocket->bind(connstr.str());
std::cout << "Network initialized" << std::endl;

std::vector blobs;
int len=4096;
for( auto i=0 ; i < 2500 ; i++ )
{
std::string str(len, '#');
blobs.push_back(str);
}

while( 1 )
{
sleep(10);
std::cout << "Sending " << blobs.size() << " messages" << std::endl;
for( std::vector::iterator it = blobs.begin() ; it !=
blobs.end() ; it++ )
{
zmq::message_t fplmsg(it->length());
memcpy(fplmsg.data(), it->data(), it->length());
fplsocket->send(fplmsg);
// usleep(1);
}
}
}

client.cc:
---
#include 
#include 
#include 
#include 

int main(int argc, char *argv[])
{
std::stringstream connstr;
connstr << "epgm://";
if( argc > 0 )   connstr << argv[1] << ";239.10.10.10:5";
else { std::cout << "Missing argument" << std::endl;
exit(1); }

std::cout << "Initializing network to " << connstr.str() << std::endl;
zmq::context_t *context   = new zmq::context_t(1);
zmq::socket_t  *fplsocket = new zmq::socket_t(*context, ZMQ_SUB);
fplsocket->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
fplsocket->connect(connstr.str());
std::cout << "Network initialized" << std::endl;

int i=0;
int rc;
zmq::message_t resultset;
while( 1 )
{
if( (rc = fplsocket->recv()) == true)
{
i++;
std::string msg_str(static_cast(resultset.data()),
resultset.size());
std::cout << "Received msg " << i << " with size: " <<
msg_str.length() << std::endl;
}
}
}

Compilation:
g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC
-DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o server.o server.cc
g++ -m64 -o server server.o -lzmq -lpgm -lprotobuf
g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC
-DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o client.o client.cc
g++ -m64 -o client client.o -lzmq -lpgm -lprotobuf

# Start on machine 1
./server 
# Start on machine 2
./client 
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev


[zeromq-dev] Only first X messages received with PGM/0MQ c++ setup

2018-04-02 Thread Johnny Berentsen
Hi all

I've just started dabbling with a setup (c++) of OpenPGM/ZeroMQ + protobuf
so be gentle :)

Everything seems to work as expected except I find that only the first X
number of messages are sent when sending Y number of messages rapidly.

I don't have a small test example ready, but here is the important stuff:
Server:
m_cmdsocket = new zmq::socket_t(*m_context, ZMQ_PULL);
m_cmdsocket->setsockopt( ZMQ_RCVTIMEO, 100 );
m_cmdsocket->bind("tcp://*:5557");

m_fplsocket  = new zmq::socket_t(*m_context, ZMQ_PUB);
const int rate = 100;  // 1Gb TX- and RX-
rate
m_fplsocket->setsockopt(ZMQ_RATE, , sizeof(rate));
m_fplsocket->bind("epgm://vboxnet0;239.10.10.10:50002");
m_fplsocket->bind("ipc:///tmp/InNOVA/3105");


Client:
m_cmdsocket = new zmq::socket_t(*m_context, ZMQ_PUSH);
m_cmdsocket->connect("tcp://192.168.56.1:5557");

m_fplsocket  = new zmq::socket_t(*m_context, ZMQ_PUB);
m_fplsocket->setsockopt( ZMQ_RCVTIMEO, 100 );
const int rate = 100;  // 1Gb TX- and RX-
rate
m_fplsocket->setsockopt(ZMQ_RATE, , sizeof(rate));
m_fplsocket->bind("epgm://lanB;239.10.10.10:50002");

I'll try to make up a compilable small version if necessary.

So this is how it currently works:
A server and client process is started and exchanges some messages on both
channels (PUSH to PULL and PUB to SUB), and once we have stable
communication the server will try to transmit 2500 protobuf messages. The
first ~1995 messages are received by the client but the remaining are lost.
If I add a one millisecond delay
(std::this_thread::sleep_for(std::chrono::milliseconds(1));) all messages
are received. This is when running on same host (i.e. using IPC). If
sending between the host and a virtual machine, the version without delay
drops down to ~30 received messages, but gets all messages with the delay.
The protobuf message consists of some short strings and numbers and a large
random bitmask with a total size of 6160 bytes.

I'm assuming some kind of queue buffer on the sender side is full and the
remaining messages are discarded, but I am not sure how I should go about
handling this. This might be there somewhere in the docs in big letters,
but has still slipped through :/ The discussions I've found concerns tcp
pub/sub and HWM so I'm not sure how that relates to IPC/PGM pub/sub.

Also note that everything happens in one thread, so the receive side is not
blocking and will timeout after 100ms and continue with printing a line and
set some timer and then start receive again.

The sending loop is like this:
for each object:
std::string protomsg;
fpl.SerializeToString();
int msgsize = protomsg.length();

zmq::message_t fplmsg(msgsize);
memcpy(fplmsg.data(), protomsg.data(), msgsize);
m_fplsocket->send(*msg);

Thanks
Johnny
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev