2018-04-02 15:30 GMT+02:00 Johnny Berentsen <johnny.berent...@gmail.com>:

> Hi all
> <snip initial explanation>
>

So I made a small example showing my problem. Attached server.cc and
client.cc at the end.
Now I have a vector with size 2500 of std::strings filled with 4096 #
characters. Every ten seconds I loop through the vector and send the data
over a EPGM socket. If I go full throttle, only the first 1000 messages are
received, but if I add a usleep(1) after each send, all messages are
received.
I'm assuming there's something really simple missing in my setup, but I am
unable to figure it out.

server.cc:
------------
#include <iostream>
#include <sstream>
#include <unistd.h>
#include <zmq.hpp>

int main(int argc, char *argv[])
{
    std::stringstream connstr;
    connstr << "epgm://";
    if( argc > 0 )   connstr << argv[1] << ";239.10.10.10:50000";
    else             { std::cout << "Missing argument" << std::endl;
exit(1); }

    std::cout << "Initializing network to " << connstr.str() << std::endl;
    zmq::context_t *context   = new zmq::context_t(1);
    zmq::socket_t  *fplsocket = new zmq::socket_t(*context, ZMQ_PUB);
    const int rate = 1000000;                              // 1Gb TX- and
RX- rate
    fplsocket->setsockopt(ZMQ_RATE, &rate, sizeof(rate));
    fplsocket->bind(connstr.str());
    std::cout << "Network initialized" << std::endl;

    std::vector<std::string> blobs;
    int len=4096;
    for( auto i=0 ; i < 2500 ; i++ )
    {
        std::string str(len, '#');
        blobs.push_back(str);
    }

    while( 1 )
    {
        sleep(10);
        std::cout << "Sending " << blobs.size() << " messages" << std::endl;
        for( std::vector<std::string>::iterator it = blobs.begin() ; it !=
blobs.end() ; it++ )
        {
            zmq::message_t fplmsg(it->length());
            memcpy(fplmsg.data(), it->data(), it->length());
            fplsocket->send(fplmsg);
            // usleep(1);
        }
    }
}

client.cc:
-----------
#include <iostream>
#include <sstream>
#include <unistd.h>
#include <zmq.hpp>

int main(int argc, char *argv[])
{
    std::stringstream connstr;
    connstr << "epgm://";
    if( argc > 0 )   connstr << argv[1] << ";239.10.10.10:50000";
    else             { std::cout << "Missing argument" << std::endl;
exit(1); }

    std::cout << "Initializing network to " << connstr.str() << std::endl;
    zmq::context_t *context   = new zmq::context_t(1);
    zmq::socket_t  *fplsocket = new zmq::socket_t(*context, ZMQ_SUB);
    fplsocket->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
    fplsocket->connect(connstr.str());
    std::cout << "Network initialized" << std::endl;

    int i=0;
    int rc;
    zmq::message_t resultset;
    while( 1 )
    {
        if( (rc = fplsocket->recv(&resultset)) == true)
        {
            i++;
            std::string msg_str(static_cast<char*>(resultset.data()),
resultset.size());
            std::cout << "Received msg " << i << " with size: " <<
msg_str.length() << std::endl;
        }
    }
}

Compilation:
g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC
-DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o server.o server.cc
g++ -m64 -o server server.o -lzmq -lpgm -lprotobuf
g++ -c -m64 -pipe -g -std=gnu++1y -Wall -W -D_REENTRANT -fPIC
-DZMQ_HAVE_OPENPGM -isystem /usr/include/pgm-4.2 -o client.o client.cc
g++ -m64 -o client client.o -lzmq -lpgm -lprotobuf

# Start on machine 1
./server <interface name>
# Start on machine 2
./client <interface name>
_______________________________________________
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to