Hi Pieter,
Thanks for the response. Attached is a test case where setting ZMQ_HWM doesn't
work. Setting ZMQ_HWM to 1024 with 1 KiB messages should limit the queue's
size to ~1 MiB, right? "top" still reports around one GiB of memory usage when
sending that amout of data.
This is with ZMQ v.2.1.10.
Cheers,
Clemens
On Friday 18 November 2011 08.14:49 Pieter Hintjens wrote:
> On Fri, Nov 18, 2011 at 3:59 AM, Clemens Lutz <[email protected]> wrote:
> > I've been writing a benchmark to verify ZeroMQ's performance is near
> > line-rate on the machines I'm using. However, I keep running into the
> > issue of ZMQ allocating memory on the sender's side equal to the amount
> > I'm sending. If I want to send more data than I have physical memory
> > the program will exit with an "out of memory" error.
>
> zmq_send is asynchronous, so does not send right away but queues the
> message in memory. Actual sending is limited by the network capacity.
> So messages will build up. Setting ZMQ_HWM on the sender socket should
> limit the number of messages. If you have a test case where setting
> ZMQ_HWM doesn't work, please post it.
>
> -Pieter
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
#include <iostream>
#include <zmq.hpp>
#include <string>
#include <stdint.h>
#define HWM_VAL 1024
#define EOF "EOF"
#define KILO_STEP 1024
#define KBYTE KILO_STEP
#define MBYTE (KBYTE * KILO_STEP)
#define GBYTE (MBYTE * KILO_STEP)
using namespace std;
int main()
{
const string strEOF = EOF;
const string address = "tcp://localhost:5555";
const uint64_t msgSize = KBYTE;
const uint64_t transferSize = GBYTE;
const uint64_t hwm = HWM_VAL;
uint64_t i, numIter;
zmq::context_t* ctx;
zmq::socket_t* socket;
numIter = transferSize / msgSize;
// set up socket
ctx = new zmq::context_t(1);
socket = new zmq::socket_t(*ctx, ZMQ_DEALER);
socket->connect(address.c_str());
// set high water mark
socket->setsockopt(ZMQ_HWM, (void *)&hwm, sizeof(hwm));
// create a new message buffer
zmq::message_t msg(msgSize);
// send messages
for (i=0; i < numIter; i++) {
memset(msg.data(), 0, msgSize);
socket->send(msg);
msg.rebuild(msgSize);
}
// notify server of EOF
memset(msg.data(), 0, msgSize);
strEOF.copy((char *)msg.data(), strEOF.size());
socket->send(msg);
// clean up
socket->close();
delete socket;
delete ctx;
return 0;
}
#include <iostream>
#include <zmq.hpp>
#include <string>
#include <stdint.h>
#define EOF "EOF"
#define KILO_STEP 1024
#define KBYTE KILO_STEP
#define MBYTE (KBYTE * KILO_STEP)
#define GBYTE (MBYTE * KILO_STEP)
using namespace std;
int main()
{
uint64_t transferred = 0;
const string strEOF = EOF;
const string address = "tcp://lo:5555";
bool rc;
zmq::context_t* ctx;
zmq::socket_t* socket;
// set up socket
ctx = new zmq::context_t(1);
socket = new zmq::socket_t(*ctx, ZMQ_ROUTER);
socket->bind(address.c_str());
// receive messages
zmq::message_t msg;
do {
msg.rebuild();
rc = socket->recv(&msg, 0);
if (rc == true)
transferred += msg.size();
} while (strEOF.compare((char *)msg.data()) != 0);
// clean up
socket->close();
delete socket;
delete ctx;
return 0;
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev