2018-04-02 15:30 GMT+02:00 Johnny Berentsen <[email protected]>:
> Hi all
> <snip initial explanation>
>
So I made a small example showing my problem. Attached server.cc and
client.cc at the end.
Now I have a vector with size 2500 of std::strings filled with 4096 #
characters. Every ten seconds I loop through the vector and send the data
over a EPGM socket. If I go full throttle, only the first 1000 messages are
received, but if I add a usleep(1) after each send, all messages are
received.
I'm assuming there's something really simple missing in my setup, but I am
unable to figure it out.
server.cc:
------------
#include <iostream>
#include <sstream>
#include <unistd.h>
#include <zmq.hpp>
int main(int argc, char *argv[])
{
std::stringstream connstr;
connstr << "epgm://";
if( argc > 0 ) connstr << argv[1] << ";239.10.10.10:50000";
else { std::cout << "Missing argument" << std::endl;
exit(1); }
std::cout << "Initializing network to " << connstr.str() << std::endl;
zmq::context_t *context = new zmq::context_t(1);
zmq::socket_t *fplsocket = new zmq::socket_t(*context, ZMQ_PUB);
const int rate = 1000000; // 1Gb TX- and
RX- rate
fplsocket->setsockopt(ZMQ_RATE, &rate, sizeof(rate));
fplsocket->bind(connstr.str());
std::cout << "Network initialized" << std::endl;
std::vector<std::string> blobs;
int len=4096;
for( auto i=0 ; i < 2500 ; i++ )
{
std::string str(len, '#');
blobs.push_back(str);
}
while( 1 )
{
sleep(10);
std::cout << "Sending " << blobs.size() << " messages" << std::endl;
for( std::vector<std::string>::iterator it = blobs.begin() ; it !=
blobs.end() ; it++ )
{
zmq::message_t fplmsg(it->length());
memcpy(fplmsg.data(), it->data(), it->length());
fplsocket->send(fplmsg);
// usleep(1);
}
}
}
client.cc:
-----------
#include <iostream>
#include <sstream>
#include <unistd.h>
#include <zmq.hpp>
int main(int argc, char *argv[])
{
std::stringstream connstr;
connstr << "epgm://";
if( argc > 0 ) connstr << argv[1] << ";239.10.10.10:50000";
else { std::cout << "Missing argument" << std::endl;
exit(1); }
std::cout << "Initializing network to " << connstr.str() << std::endl;
zmq::context_t *context = new zmq::context_t(1);
zmq::socket_t *fplsocket = new zmq::socket_t(*context, ZMQ_SUB);
fplsocket->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
fplsocket->connect(connstr.str());
std::cout << "Network initialized" << std::endl;
int i=0;
int rc;
zmq::message_t resultset;
while( 1 )
{
if( (rc = fplsocket->recv(&resultset)) == true)
{
i++;
std::string msg_str(static_cast<char*>(resultset.data()),
resultset.size());
std::cout << "Received msg " << i << " with size: " <<
msg_str.length() << std::endl;
}
}
}
Compilation:
g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC
-DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o server.o server.cc
g++ -m64 -o server server.o -lzmq -lpgm -lprotobuf
g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC
-DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o client.o client.cc
g++ -m64 -o client client.o -lzmq -lpgm -lprotobuf
# Start on machine 1
./server <interface name>
# Start on machine 2
./client <interface name>
_______________________________________________
zeromq-dev mailing list
[email protected]
https://lists.zeromq.org/mailman/listinfo/zeromq-dev