Hi Pieter,

Thanks for the response. Attached is a test case where setting ZMQ_HWM doesn't 
work. Setting ZMQ_HWM to 1024 with 1 KiB messages should limit the queue's 
size to ~1 MiB, right? "top" still reports around one GiB of memory usage when 
sending that amout of data.
This is with ZMQ v.2.1.10.

Cheers,
Clemens


On Friday 18 November 2011 08.14:49 Pieter Hintjens wrote:
> On Fri, Nov 18, 2011 at 3:59 AM, Clemens Lutz <[email protected]> wrote:
> > I've been writing a benchmark to verify ZeroMQ's performance is near
> > line-rate on the machines I'm using. However, I keep running into the
> > issue of ZMQ allocating memory on the sender's side equal to the amount
> > I'm sending. If I want to send more data than I have physical memory
> > the program will exit with an "out of memory" error.
> 
> zmq_send is asynchronous, so does not send right away but queues the
> message in memory. Actual sending is limited by the network capacity.
> So messages will build up. Setting ZMQ_HWM on the sender socket should
> limit the number of messages. If you have a test case where setting
> ZMQ_HWM doesn't work, please post it.
> 
> -Pieter
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
#include <iostream>
#include <zmq.hpp>
#include <string>
#include <stdint.h>

#define HWM_VAL     1024

#define EOF         "EOF"
#define KILO_STEP   1024
#define KBYTE       KILO_STEP
#define MBYTE       (KBYTE * KILO_STEP)
#define GBYTE       (MBYTE * KILO_STEP)

using namespace std;

int main()
{
    const string strEOF = EOF;
    const string address = "tcp://localhost:5555";
    const uint64_t msgSize = KBYTE;
    const uint64_t transferSize = GBYTE;
    const uint64_t hwm = HWM_VAL;
    uint64_t i, numIter;
    zmq::context_t* ctx;
    zmq::socket_t* socket;

    numIter = transferSize / msgSize;

    // set up socket
    ctx = new zmq::context_t(1);
    socket = new zmq::socket_t(*ctx, ZMQ_DEALER);
    socket->connect(address.c_str());

    // set high water mark
    socket->setsockopt(ZMQ_HWM, (void *)&hwm, sizeof(hwm));

    // create a new message buffer
    zmq::message_t msg(msgSize);

    // send messages
    for (i=0; i < numIter; i++) {
        memset(msg.data(), 0, msgSize);
        socket->send(msg);
        msg.rebuild(msgSize);
    }

    // notify server of EOF
    memset(msg.data(), 0, msgSize);
    strEOF.copy((char *)msg.data(), strEOF.size());
    socket->send(msg);

    // clean up
    socket->close();
    delete socket;
    delete ctx;

    return 0;
}
#include <iostream>
#include <zmq.hpp>
#include <string>
#include <stdint.h>

#define EOF         "EOF"
#define KILO_STEP   1024
#define KBYTE       KILO_STEP
#define MBYTE       (KBYTE * KILO_STEP)
#define GBYTE       (MBYTE * KILO_STEP)

using namespace std;

int main()
{
    uint64_t transferred = 0;
    const string strEOF = EOF;
    const string address = "tcp://lo:5555";
    bool rc;
    zmq::context_t* ctx;
    zmq::socket_t* socket;

    // set up socket
    ctx = new zmq::context_t(1);
    socket = new zmq::socket_t(*ctx, ZMQ_ROUTER);
    socket->bind(address.c_str());

    // receive messages
    zmq::message_t msg;
    do {
        msg.rebuild();
        rc = socket->recv(&msg, 0);
        if (rc == true)
            transferred += msg.size();
    } while (strEOF.compare((char *)msg.data()) != 0);

    // clean up
    socket->close();
    delete socket;
    delete ctx;

    return 0;
}

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to