On Mon, 2018-04-16 at 13:54 +0200, Johnny Berentsen wrote: > 2018-04-02 15:30 GMT+02:00 Johnny Berentsen <[email protected] > om>: > > > Hi all > > <snip initial explanation> > > > > So I made a small example showing my problem. Attached server.cc and > client.cc at the end. > Now I have a vector with size 2500 of std::strings filled with 4096 # > characters. Every ten seconds I loop through the vector and send the > data > over a EPGM socket. If I go full throttle, only the first 1000 > messages are > received, but if I add a usleep(1) after each send, all messages are > received. > I'm assuming there's something really simple missing in my setup, but > I am > unable to figure it out.
Default high water mark is 1000 messages, most likely the receiving
side is too slow to process and so it starts dropping messages
> server.cc:
> ------------
> #include <iostream>
> #include <sstream>
> #include <unistd.h>
> #include <zmq.hpp>
>
> int main(int argc, char *argv[])
> {
> std::stringstream connstr;
> connstr << "epgm://";
> if( argc > 0 ) connstr << argv[1] << ";239.10.10.10:50000";
> else { std::cout << "Missing argument" << std::endl;
> exit(1); }
>
> std::cout << "Initializing network to " << connstr.str() <<
> std::endl;
> zmq::context_t *context = new zmq::context_t(1);
> zmq::socket_t *fplsocket = new zmq::socket_t(*context, ZMQ_PUB);
> const int rate = 1000000; // 1Gb TX-
> and
> RX- rate
> fplsocket->setsockopt(ZMQ_RATE, &rate, sizeof(rate));
> fplsocket->bind(connstr.str());
> std::cout << "Network initialized" << std::endl;
>
> std::vector<std::string> blobs;
> int len=4096;
> for( auto i=0 ; i < 2500 ; i++ )
> {
> std::string str(len, '#');
> blobs.push_back(str);
> }
>
> while( 1 )
> {
> sleep(10);
> std::cout << "Sending " << blobs.size() << " messages" <<
> std::endl;
> for( std::vector<std::string>::iterator it = blobs.begin() ;
> it !=
> blobs.end() ; it++ )
> {
> zmq::message_t fplmsg(it->length());
> memcpy(fplmsg.data(), it->data(), it->length());
> fplsocket->send(fplmsg);
> // usleep(1);
> }
> }
> }
>
> client.cc:
> -----------
> #include <iostream>
> #include <sstream>
> #include <unistd.h>
> #include <zmq.hpp>
>
> int main(int argc, char *argv[])
> {
> std::stringstream connstr;
> connstr << "epgm://";
> if( argc > 0 ) connstr << argv[1] << ";239.10.10.10:50000";
> else { std::cout << "Missing argument" << std::endl;
> exit(1); }
>
> std::cout << "Initializing network to " << connstr.str() <<
> std::endl;
> zmq::context_t *context = new zmq::context_t(1);
> zmq::socket_t *fplsocket = new zmq::socket_t(*context, ZMQ_SUB);
> fplsocket->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
> fplsocket->connect(connstr.str());
> std::cout << "Network initialized" << std::endl;
>
> int i=0;
> int rc;
> zmq::message_t resultset;
> while( 1 )
> {
> if( (rc = fplsocket->recv(&resultset)) == true)
> {
> i++;
> std::string msg_str(static_cast<char*>(resultset.data()),
> resultset.size());
> std::cout << "Received msg " << i << " with size: " <<
> msg_str.length() << std::endl;
> }
> }
> }
>
> Compilation:
> g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC
> -DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o server.o
> server.cc
> g++ -m64 -o server server.o -lzmq -lpgm -lprotobuf
> g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC
> -DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o client.o
> client.cc
> g++ -m64 -o client client.o -lzmq -lpgm -lprotobuf
>
> # Start on machine 1
> ./server <interface name>
> # Start on machine 2
> ./client <interface name>
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
--
Kind regards,
Luca Boccassi
signature.asc
Description: This is a digitally signed message part
_______________________________________________ zeromq-dev mailing list [email protected] https://lists.zeromq.org/mailman/listinfo/zeromq-dev
