On Mon, 2018-04-16 at 13:54 +0200, Johnny Berentsen wrote: > 2018-04-02 15:30 GMT+02:00 Johnny Berentsen <johnny.berentsen@gmail.c > om>: > > > Hi all > > <snip initial explanation> > > > > So I made a small example showing my problem. Attached server.cc and > client.cc at the end. > Now I have a vector with size 2500 of std::strings filled with 4096 # > characters. Every ten seconds I loop through the vector and send the > data > over a EPGM socket. If I go full throttle, only the first 1000 > messages are > received, but if I add a usleep(1) after each send, all messages are > received. > I'm assuming there's something really simple missing in my setup, but > I am > unable to figure it out.
Default high water mark is 1000 messages, most likely the receiving side is too slow to process and so it starts dropping messages > server.cc: > ------------ > #include <iostream> > #include <sstream> > #include <unistd.h> > #include <zmq.hpp> > > int main(int argc, char *argv[]) > { > std::stringstream connstr; > connstr << "epgm://"; > if( argc > 0 ) connstr << argv[1] << ";239.10.10.10:50000"; > else { std::cout << "Missing argument" << std::endl; > exit(1); } > > std::cout << "Initializing network to " << connstr.str() << > std::endl; > zmq::context_t *context = new zmq::context_t(1); > zmq::socket_t *fplsocket = new zmq::socket_t(*context, ZMQ_PUB); > const int rate = 1000000; // 1Gb TX- > and > RX- rate > fplsocket->setsockopt(ZMQ_RATE, &rate, sizeof(rate)); > fplsocket->bind(connstr.str()); > std::cout << "Network initialized" << std::endl; > > std::vector<std::string> blobs; > int len=4096; > for( auto i=0 ; i < 2500 ; i++ ) > { > std::string str(len, '#'); > blobs.push_back(str); > } > > while( 1 ) > { > sleep(10); > std::cout << "Sending " << blobs.size() << " messages" << > std::endl; > for( std::vector<std::string>::iterator it = blobs.begin() ; > it != > blobs.end() ; it++ ) > { > zmq::message_t fplmsg(it->length()); > memcpy(fplmsg.data(), it->data(), it->length()); > fplsocket->send(fplmsg); > // usleep(1); > } > } > } > > client.cc: > ----------- > #include <iostream> > #include <sstream> > #include <unistd.h> > #include <zmq.hpp> > > int main(int argc, char *argv[]) > { > std::stringstream connstr; > connstr << "epgm://"; > if( argc > 0 ) connstr << argv[1] << ";239.10.10.10:50000"; > else { std::cout << "Missing argument" << std::endl; > exit(1); } > > std::cout << "Initializing network to " << connstr.str() << > std::endl; > zmq::context_t *context = new zmq::context_t(1); > zmq::socket_t *fplsocket = new zmq::socket_t(*context, ZMQ_SUB); > fplsocket->setsockopt( ZMQ_SUBSCRIBE, "", 0 ); > fplsocket->connect(connstr.str()); > std::cout << "Network initialized" << std::endl; > > int i=0; > int rc; > zmq::message_t resultset; > while( 1 ) > { > if( (rc = fplsocket->recv(&resultset)) == true) > { > i++; > std::string msg_str(static_cast<char*>(resultset.data()), > resultset.size()); > std::cout << "Received msg " << i << " with size: " << > msg_str.length() << std::endl; > } > } > } > > Compilation: > g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC > -DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o server.o > server.cc > g++ -m64 -o server server.o -lzmq -lpgm -lprotobuf > g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC > -DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o client.o > client.cc > g++ -m64 -o client client.o -lzmq -lpgm -lprotobuf > > # Start on machine 1 > ./server <interface name> > # Start on machine 2 > ./client <interface name> > _______________________________________________ > zeromq-dev mailing list > zeromq-dev@lists.zeromq.org > https://lists.zeromq.org/mailman/listinfo/zeromq-dev -- Kind regards, Luca Boccassi
signature.asc
Description: This is a digitally signed message part
_______________________________________________ zeromq-dev mailing list zeromq-dev@lists.zeromq.org https://lists.zeromq.org/mailman/listinfo/zeromq-dev