Re: [zeromq-dev] Only first X messages received with PGM/0MQ c++ setup
On Mon, 2018-04-16 at 18:25 +0200, Johnny Berentsen wrote: > 2018-04-16 14:35 GMT+02:00 Luca Boccassi: > > > On Mon, 2018-04-16 at 13:54 +0200, Johnny Berentsen wrote: > > > 2018-04-02 15:30 GMT+02:00 Johnny Berentsen > > il.c > > > om>: > > > > > > > Hi all > > > > > > > > > > > > > > Now I have a vector with size 2500 of std::strings filled with > > > 4096 # > > > characters. Every ten seconds I loop through the vector and send > > > the > > > data > > > over a EPGM socket. If I go full throttle, only the first 1000 > > > messages are > > > received, but if I add a usleep(1) after each send, all messages > > > are > > > received. > > > > Default high water mark is 1000 messages, most likely the receiving > > side is too slow to process and so it starts dropping messages > > > > > > You're absolutely right. I was put off by the documentation that > said: > The default *ZMQ_SNDHWM* value of zero means "no limit". > > Shame on me for not trying though :/ > > I understand that setting the HWM is different depending on the usage > pattern. However, I can't find much information on the different > criterias > that should be considered. > I've come up with these: > Typical and max packet size, network restrictions, number of packets, > available memory, CPU > > My application will run on a LAN on modern PC's with >=8GiB memory, > up to a > few kB's per message and only a burst of a few thousand messages once > in a > while. What value is reasonable in such a case? It mostly depend on your application and how quickly it can pull messages off the socket. So the only way is to try and test it. > I also tested with checking the return value of send() but it > returned true > regardless of sending or not. Are there other ways of being noticed > that > messages have been dropped? No, that's the whole point about pub-sub - if you don't want a lossy mechanism then you should use a different pattern. Or you can set unlimited water marks and deal with the unbounded memory usage. -- Kind regards, Luca Boccassi signature.asc Description: This is a digitally signed message part ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org https://lists.zeromq.org/mailman/listinfo/zeromq-dev
Re: [zeromq-dev] Only first X messages received with PGM/0MQ c++ setup
2018-04-16 14:35 GMT+02:00 Luca Boccassi: > On Mon, 2018-04-16 at 13:54 +0200, Johnny Berentsen wrote: > > 2018-04-02 15:30 GMT+02:00 Johnny Berentsen > om>: > > > > > Hi all > > > > > > > > > > Now I have a vector with size 2500 of std::strings filled with 4096 # > > characters. Every ten seconds I loop through the vector and send the > > data > > over a EPGM socket. If I go full throttle, only the first 1000 > > messages are > > received, but if I add a usleep(1) after each send, all messages are > > received. > > Default high water mark is 1000 messages, most likely the receiving > side is too slow to process and so it starts dropping messages > > You're absolutely right. I was put off by the documentation that said: The default *ZMQ_SNDHWM* value of zero means "no limit". Shame on me for not trying though :/ I understand that setting the HWM is different depending on the usage pattern. However, I can't find much information on the different criterias that should be considered. I've come up with these: Typical and max packet size, network restrictions, number of packets, available memory, CPU My application will run on a LAN on modern PC's with >=8GiB memory, up to a few kB's per message and only a burst of a few thousand messages once in a while. What value is reasonable in such a case? I also tested with checking the return value of send() but it returned true regardless of sending or not. Are there other ways of being noticed that messages have been dropped? JC ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org https://lists.zeromq.org/mailman/listinfo/zeromq-dev
Re: [zeromq-dev] Only first X messages received with PGM/0MQ c++ setup
On Mon, 2018-04-16 at 13:54 +0200, Johnny Berentsen wrote: > 2018-04-02 15:30 GMT+02:00 Johnny Berentsenom>: > > > Hi all > > > > > > So I made a small example showing my problem. Attached server.cc and > client.cc at the end. > Now I have a vector with size 2500 of std::strings filled with 4096 # > characters. Every ten seconds I loop through the vector and send the > data > over a EPGM socket. If I go full throttle, only the first 1000 > messages are > received, but if I add a usleep(1) after each send, all messages are > received. > I'm assuming there's something really simple missing in my setup, but > I am > unable to figure it out. Default high water mark is 1000 messages, most likely the receiving side is too slow to process and so it starts dropping messages > server.cc: > > #include > #include > #include > #include > > int main(int argc, char *argv[]) > { > std::stringstream connstr; > connstr << "epgm://"; > if( argc > 0 ) connstr << argv[1] << ";239.10.10.10:5"; > else { std::cout << "Missing argument" << std::endl; > exit(1); } > > std::cout << "Initializing network to " << connstr.str() << > std::endl; > zmq::context_t *context = new zmq::context_t(1); > zmq::socket_t *fplsocket = new zmq::socket_t(*context, ZMQ_PUB); > const int rate = 100; // 1Gb TX- > and > RX- rate > fplsocket->setsockopt(ZMQ_RATE, , sizeof(rate)); > fplsocket->bind(connstr.str()); > std::cout << "Network initialized" << std::endl; > > std::vector blobs; > int len=4096; > for( auto i=0 ; i < 2500 ; i++ ) > { > std::string str(len, '#'); > blobs.push_back(str); > } > > while( 1 ) > { > sleep(10); > std::cout << "Sending " << blobs.size() << " messages" << > std::endl; > for( std::vector::iterator it = blobs.begin() ; > it != > blobs.end() ; it++ ) > { > zmq::message_t fplmsg(it->length()); > memcpy(fplmsg.data(), it->data(), it->length()); > fplsocket->send(fplmsg); > // usleep(1); > } > } > } > > client.cc: > --- > #include > #include > #include > #include > > int main(int argc, char *argv[]) > { > std::stringstream connstr; > connstr << "epgm://"; > if( argc > 0 ) connstr << argv[1] << ";239.10.10.10:5"; > else { std::cout << "Missing argument" << std::endl; > exit(1); } > > std::cout << "Initializing network to " << connstr.str() << > std::endl; > zmq::context_t *context = new zmq::context_t(1); > zmq::socket_t *fplsocket = new zmq::socket_t(*context, ZMQ_SUB); > fplsocket->setsockopt( ZMQ_SUBSCRIBE, "", 0 ); > fplsocket->connect(connstr.str()); > std::cout << "Network initialized" << std::endl; > > int i=0; > int rc; > zmq::message_t resultset; > while( 1 ) > { > if( (rc = fplsocket->recv()) == true) > { > i++; > std::string msg_str(static_cast (resultset.data()), > resultset.size()); > std::cout << "Received msg " << i << " with size: " << > msg_str.length() << std::endl; > } > } > } > > Compilation: > g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC > -DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o server.o > server.cc > g++ -m64 -o server server.o -lzmq -lpgm -lprotobuf > g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC > -DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o client.o > client.cc > g++ -m64 -o client client.o -lzmq -lpgm -lprotobuf > > # Start on machine 1 > ./server > # Start on machine 2 > ./client > ___ > zeromq-dev mailing list > zeromq-dev@lists.zeromq.org > https://lists.zeromq.org/mailman/listinfo/zeromq-dev -- Kind regards, Luca Boccassi signature.asc Description: This is a digitally signed message part ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org https://lists.zeromq.org/mailman/listinfo/zeromq-dev
Re: [zeromq-dev] Only first X messages received with PGM/0MQ c++ setup
2018-04-02 15:30 GMT+02:00 Johnny Berentsen: > Hi all > > So I made a small example showing my problem. Attached server.cc and client.cc at the end. Now I have a vector with size 2500 of std::strings filled with 4096 # characters. Every ten seconds I loop through the vector and send the data over a EPGM socket. If I go full throttle, only the first 1000 messages are received, but if I add a usleep(1) after each send, all messages are received. I'm assuming there's something really simple missing in my setup, but I am unable to figure it out. server.cc: #include #include #include #include int main(int argc, char *argv[]) { std::stringstream connstr; connstr << "epgm://"; if( argc > 0 ) connstr << argv[1] << ";239.10.10.10:5"; else { std::cout << "Missing argument" << std::endl; exit(1); } std::cout << "Initializing network to " << connstr.str() << std::endl; zmq::context_t *context = new zmq::context_t(1); zmq::socket_t *fplsocket = new zmq::socket_t(*context, ZMQ_PUB); const int rate = 100; // 1Gb TX- and RX- rate fplsocket->setsockopt(ZMQ_RATE, , sizeof(rate)); fplsocket->bind(connstr.str()); std::cout << "Network initialized" << std::endl; std::vector blobs; int len=4096; for( auto i=0 ; i < 2500 ; i++ ) { std::string str(len, '#'); blobs.push_back(str); } while( 1 ) { sleep(10); std::cout << "Sending " << blobs.size() << " messages" << std::endl; for( std::vector::iterator it = blobs.begin() ; it != blobs.end() ; it++ ) { zmq::message_t fplmsg(it->length()); memcpy(fplmsg.data(), it->data(), it->length()); fplsocket->send(fplmsg); // usleep(1); } } } client.cc: --- #include #include #include #include int main(int argc, char *argv[]) { std::stringstream connstr; connstr << "epgm://"; if( argc > 0 ) connstr << argv[1] << ";239.10.10.10:5"; else { std::cout << "Missing argument" << std::endl; exit(1); } std::cout << "Initializing network to " << connstr.str() << std::endl; zmq::context_t *context = new zmq::context_t(1); zmq::socket_t *fplsocket = new zmq::socket_t(*context, ZMQ_SUB); fplsocket->setsockopt( ZMQ_SUBSCRIBE, "", 0 ); fplsocket->connect(connstr.str()); std::cout << "Network initialized" << std::endl; int i=0; int rc; zmq::message_t resultset; while( 1 ) { if( (rc = fplsocket->recv()) == true) { i++; std::string msg_str(static_cast (resultset.data()), resultset.size()); std::cout << "Received msg " << i << " with size: " << msg_str.length() << std::endl; } } } Compilation: g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC -DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o server.o server.cc g++ -m64 -o server server.o -lzmq -lpgm -lprotobuf g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC -DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o client.o client.cc g++ -m64 -o client client.o -lzmq -lpgm -lprotobuf # Start on machine 1 ./server # Start on machine 2 ./client ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org https://lists.zeromq.org/mailman/listinfo/zeromq-dev
[zeromq-dev] Only first X messages received with PGM/0MQ c++ setup
Hi all I've just started dabbling with a setup (c++) of OpenPGM/ZeroMQ + protobuf so be gentle :) Everything seems to work as expected except I find that only the first X number of messages are sent when sending Y number of messages rapidly. I don't have a small test example ready, but here is the important stuff: Server: m_cmdsocket = new zmq::socket_t(*m_context, ZMQ_PULL); m_cmdsocket->setsockopt( ZMQ_RCVTIMEO, 100 ); m_cmdsocket->bind("tcp://*:5557"); m_fplsocket = new zmq::socket_t(*m_context, ZMQ_PUB); const int rate = 100; // 1Gb TX- and RX- rate m_fplsocket->setsockopt(ZMQ_RATE, , sizeof(rate)); m_fplsocket->bind("epgm://vboxnet0;239.10.10.10:50002"); m_fplsocket->bind("ipc:///tmp/InNOVA/3105"); Client: m_cmdsocket = new zmq::socket_t(*m_context, ZMQ_PUSH); m_cmdsocket->connect("tcp://192.168.56.1:5557"); m_fplsocket = new zmq::socket_t(*m_context, ZMQ_PUB); m_fplsocket->setsockopt( ZMQ_RCVTIMEO, 100 ); const int rate = 100; // 1Gb TX- and RX- rate m_fplsocket->setsockopt(ZMQ_RATE, , sizeof(rate)); m_fplsocket->bind("epgm://lanB;239.10.10.10:50002"); I'll try to make up a compilable small version if necessary. So this is how it currently works: A server and client process is started and exchanges some messages on both channels (PUSH to PULL and PUB to SUB), and once we have stable communication the server will try to transmit 2500 protobuf messages. The first ~1995 messages are received by the client but the remaining are lost. If I add a one millisecond delay (std::this_thread::sleep_for(std::chrono::milliseconds(1));) all messages are received. This is when running on same host (i.e. using IPC). If sending between the host and a virtual machine, the version without delay drops down to ~30 received messages, but gets all messages with the delay. The protobuf message consists of some short strings and numbers and a large random bitmask with a total size of 6160 bytes. I'm assuming some kind of queue buffer on the sender side is full and the remaining messages are discarded, but I am not sure how I should go about handling this. This might be there somewhere in the docs in big letters, but has still slipped through :/ The discussions I've found concerns tcp pub/sub and HWM so I'm not sure how that relates to IPC/PGM pub/sub. Also note that everything happens in one thread, so the receive side is not blocking and will timeout after 100ms and continue with printing a line and set some timer and then start receive again. The sending loop is like this: for each object: std::string protomsg; fpl.SerializeToString(); int msgsize = protomsg.length(); zmq::message_t fplmsg(msgsize); memcpy(fplmsg.data(), protomsg.data(), msgsize); m_fplsocket->send(*msg); Thanks Johnny ___ zeromq-dev mailing list zeromq-dev@lists.zeromq.org https://lists.zeromq.org/mailman/listinfo/zeromq-dev