[zeromq-dev] MDP protocol, detecting dead workers

2017-02-08 Thread Gyorgy Szekely
Hi,
Background:
I have a message broker written with cppzmq implementing the Majordomo
protocol. It works really fine, except for one scenario: when a worker
crashes during processing. The protocol handles this as no new task is
assigned to the dead worker, but the broker never realizes that it lost a
worker.
In my environment workers die quite often, and this is visible to the
broker: tcp link goes down. My problem is that the broker is not aware of
such events and effectively leaks worker related objects and provides false
stats on available resources (the worker reconnects as a new worker).

Question:
Is it possible get the identity of disconnected peers on a ROUTER socket
without actually sending a message?

There's a dedicated socket for workers in the broker, and there's a monitor
attached to it, which reports connection closed events, but I found no way
to associate these events with router identity. Is this intentional?
I also tired setting the ZMQ_ROUTER_MANDATORY flag, and sending a single
frame message consisting of the identity only, but it gets discarded
without ever throwing a EHOSTUNREACH error.

The only way I could come up with is to send a real (heartbeat) message to
a worker which will trigger EHOSTUNREACH for disconnected workers, but it
will queue up in busy workers. I wouldn't even consider this as a
workaround...

Any ideas solve this correctly?

Regards,
   Gyorgy Szekely
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev

Re: [zeromq-dev] router socket hangs on write (was detecting dead MDP workers)

2017-02-19 Thread Gyorgy Szekely
Hi Luca,
Unfortunately I'm not familiar with libzmq internals, so I can't decide
whether or not EAGAIN is appropriate. But as a library user I expect either:
sending a message on a mandatory router socket not to block when queue is
below HWM
-or-
documentation explicitly stating that ZMQ_ROUTER_MANDATORY must always be
used with ZMQ_DONTWAIT because the socket may occasionally block
(independent of HWM).

I attached a code example to demonstrate the problem. The router socket
send blocks, HWM is not reached (only 1 message in the queue), and the
socket never recovers as the pipe never returns to a state where it accepts
messages.

I agree that this is a corner case, the timeframe when the socket may block
is really short (sending exactly the same moment the peer disconnects), but
still the operation can't be called non-blocking. The attached example
triggers the issue with high occurrence rate by sending the message when
the monitor reports peer disconnect, but I could also reproduce the issue
without the monitor event (much lower occurrence rate of course).

With my very limited knowledge of the library internals I would replace the
condition:
router.cpp:213 if (it != outpipes.end ())
with something like this:
router.cpp:213if (it != outpipes.end () &&
it->second.pipe->check_active())  // (out_active && state == active)
but it's probably not that simple. :)

Regards,
  Gyorgy

On Sat, Feb 18, 2017 at 7:32 PM, Luca Boccassi <luca.bocca...@gmail.com>
wrote:

> On Fri, 2017-02-17 at 10:53 +0100, Gyorgy Szekely wrote:
> > Hi,
> > Sorry for spamming the list :( I will rate limit myself.
> >
> > I reviewed the docs for ZMQ_ROUTER_MANDATORY and it's clear now that
> > the
> > router socket may block if the message can be routed but HWM is
> > reached and
> > ZMQ_DONTWAIT is not specified. This is the exact code path my
> > application
> > blocks in.
> >
> > The problem is that HWM is not reached in my case.
> > zmq::router_t::xsend()
> > checks HWM with zmq::pipe_t::check_write(), which returns false, but
> > not
> > because HWM is reached, but beacuse pipe state is
> > zmq::pipe_t::waiting_for_delimiter.
> >
> > Summary:
> > I don't think it's reasonable for zmq::router_t::xsend() to return -1
> > EAGAIN, when the corresponding pipe is being terminated. It's obvious
> > that
> > the message can't be sent in the future, there's no point in
> > retrying.
> >
> > (For the time being, as a workaround I specify ZMQ_DONTWAIT on the
> > send,
> > and I consider the worker dead with either EHOTUNREACH or EAGAIN.)
> >
> > What's your opinion on this?
> >
> >
> > Regards,
> >   Gyorgy
>
> Is the pipe terminated when the underlying socket is disconnected? I
> can't remember and I'd have to double check, but if that's the case
> then it could come back, so EAGAIN would be appropriate, right?
>
> Also the check_write just returns true/false, and given it's in the hot
> path I'd be wary of overloading it to cater for a single corner case.
>
> > On Thu, Feb 16, 2017 at 10:44 PM, Gyorgy Szekely <hodito...@gmail.com
> > >
> > wrote:
> >
> > > Hi,
> > > I dug a bit deeper, here are my findings:
> > > - removing the on/off switching for the ZMQ_ROUTER_MANDATORY flag,
> > > and
> > > enabling it before the router socket bind: makes no difference
> > > - removing the monitor trigger and heartbeating the workers
> > > periodically
> > > (2.5 sec) drastically reduces the occurrence rate, the program
> > > hangs after
> > > 3-4 hours, instead of seconds. (in the background a worker
> > > connects/disconnects with 4 second period time)
> > >
> > > From this I suspect the issue appears in a small timeframe which is
> > > close
> > > to the monitor event, but otherwise hard to hit.
> > >
> > > With GDB is see the following:
> > > - in zmq::socket_base_t::send() the call to xsend() returns EAGAIN.
> > > This
> > > should not happen since the ZMQ_DONTWAIT is not specified.
> > > - ZMQ_DONTWAIT is not specified, so the function won't return -1,
> > > but
> > > block (see trace in prev mail).
> > >
> > > - inside zmq::router_t::xsend() the pipe is found in the outpipes
> > > map, but
> > > the check_write() on it returns false
> > > - the if(mandatory) check in this block (router.cpp:218) returns
> > > with -1,
> > > EAGAIN
> > > - a similar block 10 lines below returns with -1, EHOSTUNREACH
> > >
> > > Should both if(mandatory) checks return E

Re: [zeromq-dev] router socket hangs on write (was detecting dead MDP workers)

2017-02-16 Thread Gyorgy Szekely
Hi,
I dug a bit deeper, here are my findings:
- removing the on/off switching for the ZMQ_ROUTER_MANDATORY flag, and
enabling it before the router socket bind: makes no difference
- removing the monitor trigger and heartbeating the workers periodically
(2.5 sec) drastically reduces the occurrence rate, the program hangs after
3-4 hours, instead of seconds. (in the background a worker
connects/disconnects with 4 second period time)

>From this I suspect the issue appears in a small timeframe which is close
to the monitor event, but otherwise hard to hit.

With GDB is see the following:
- in zmq::socket_base_t::send() the call to xsend() returns EAGAIN. This
should not happen since the ZMQ_DONTWAIT is not specified.
- ZMQ_DONTWAIT is not specified, so the function won't return -1, but block
(see trace in prev mail).

- inside zmq::router_t::xsend() the pipe is found in the outpipes map, but
the check_write() on it returns false
- the if(mandatory) check in this block (router.cpp:218) returns with -1,
EAGAIN
- a similar block 10 lines below returns with -1, EHOSTUNREACH

Should both if(mandatory) checks return EHOSTUNREACH? There's also a
comment in the header for bool mandatory, that it will report EAGAIN, but
this contradicts with the documentation.

Can you help to clarify?


Regards,
  Gyorgy


It

On Thu, Feb 16, 2017 at 12:22 PM, Gyorgy Szekely <hodito...@gmail.com>
wrote:

> Hi,
> Continuing my journey on detecting dead workers I reduced the design to
> the minimal, and eliminated the messy file descriptors.
> I only have:
> - a router socket, with some number of peers
> - a monitor socket attached to the router socket
>
> When the monitor detects a disconnect on the router socket:
> - do setsockopt(ZMQ_ROUTER_MANDATORY, 1);
> - send heartbeat message to every known peer
> - if EHOSTUNREACH returned: remove the peer
> - do setsockopt(ZMQ_ROUTER_MANDATORY, 0);
>
> What happens: _my app regularly hangs_ in zmq_msg_send(). Roughly 20% of
> the invocations. The call never returns, I have to kill the application.
>
> What am I doing wrong??? According to the RFC's router sockets should
> never block.
> I attached a full stacktrace with info locals and args for each relevant
> frame (sorry for the machine readable format).
>
> Env:
> libzmq 4.2.1 stable, debug build
> Ubuntu 16.04 64bit (the same happens with ubuntu packaged lib)
>
> Regards,
>   Gyorgy
>
>
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev

Re: [zeromq-dev] MDP protocol, detecting dead workers

2017-02-14 Thread Gyorgy Szekely
Hi,
The implemented protocol (ZMQ-RFC 7/MDP) has application level mutual
heartbeating between the broker and the worker. And this works fine: both
parties detect if the other side dies via missing heartbeats. The problem
appears when the worker is assigned a long running job, heartbeating is
_disabled_ while the job is being processed (as per 7/MDP specifies). This
enables the worker to be single threaded, and avoids typical multithreaded
issues (eg. processing thread hangs, heartbeating thread runs; worker in
inconsistent state).

When a worker crashes during job processing my application doesn't realize
this since no messages are flowing (the broker is waiting for the job
result), but the libzmq detects this, as the socket is always closed. My
goal is to always keep in sync the number of underlying sockets in libzmq
and Worker related objects in my application.

I've googled around and found a few libzmq features that would suit my
needs:
- ZMQ_IDENTITY_FD - this was introduced and shortly removed from the lib
- ZMQ_SRCFD - deprecated, but it's exactly what I need!
- "Peer-Address" metadata, the recommended replacement for ZMQ_SRCFD, but
not suitable for my needs

I know fd's should be handled with care (monitor events are asynchronous,
fd's get reused), but ZMQ_SRCFD solves my problem with the following
ruleset:
1. When a Worker registers (first message over a connection) save the
underlying fd
- and -
2. Check that this fd is in use by another Worker, if it is: that Worker is
dead since libzmq reused its file descriptor

3. If a Worker's fd is in closed state for a longer period (heartbeat
expiry time), then it crashed and the fd was not re-used (get this info
from monitor)

I don't know if this is considered as an ugly hack by hardcore zeromq
users, but it looks like a legitimate ZMQ_SRCFD use-case to me. It would be
nice if it wasn't removed in the upcoming versions.
Any feedback welcome!

Regards,
  Gyorgy



On Mon, Feb 13, 2017 at 10:21 PM, Greg Young 
wrote:

> I believe the term here is application level heartbeats.
>
> It should also be supported that clients can heartbeat to server. It
> is not always that all clients want similar heartbeat timeouts.
>
> On Mon, Feb 13, 2017 at 4:07 PM, Michal Vyskocil
>  wrote:
> > Hi,
> >
> > You can take inspiration from malamute broker
> > https://github.com/zeromq/malamute
> >
> > There clients pings server regularly. The same does MQTT (just it's a
> > server, who pings clients).
> >
> > Sadly malamute is vulnerable to the same problem, that received service
> > request may get lost. Solution would be to let client to send a request
> > again after timeout, however wasn't yet implemented.
> >
> > ___
> > zeromq-dev mailing list
> > zeromq-dev@lists.zeromq.org
> > https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
>
> --
> Studying for the Turing test
> ___
> zeromq-dev mailing list
> zeromq-dev@lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev

Re: [zeromq-dev] MDP protocol, detecting dead workers

2017-02-15 Thread Gyorgy Szekely
Hi,
I assume the following:
When a dealer socket (worker) reconnects to a router socket (broker) due to
a transient network issue (reconnection happens on libzmq level), the new
connection _always_ gets a new identity in the router socket, and _may_ get
a different file descriptor (fd might get reused). Workers don't specify
their identity.
Is this correct?

If it is, then I can deal with identity->fd associations fine.

And yes, you're right about the protocol improvements, I'll consider this
option too.

Regards,
Gyorgy


On Tue, Feb 14, 2017 at 9:18 PM, Doron Somech <somdo...@gmail.com> wrote:

> Using srcfd is prolemtic, zeromq handle reconnection and the srcfd might
> change.
>
> To solve your problem I would change the design and continue sending
> heartbeat during long job and change to worker to two threads model.
>
> Alternatively you set a maximum time for a job, after which you consider
> the worker dead. If not dead you can handle the reconnection then.
>
> On Feb 14, 2017 17:33, "Gyorgy Szekely" <hodito...@gmail.com> wrote:
>
>> Hi,
>> The implemented protocol (ZMQ-RFC 7/MDP) has application level mutual
>> heartbeating between the broker and the worker. And this works fine: both
>> parties detect if the other side dies via missing heartbeats. The problem
>> appears when the worker is assigned a long running job, heartbeating is
>> _disabled_ while the job is being processed (as per 7/MDP specifies). This
>> enables the worker to be single threaded, and avoids typical multithreaded
>> issues (eg. processing thread hangs, heartbeating thread runs; worker in
>> inconsistent state).
>>
>> When a worker crashes during job processing my application doesn't
>> realize this since no messages are flowing (the broker is waiting for the
>> job result), but the libzmq detects this, as the socket is always closed.
>> My goal is to always keep in sync the number of underlying sockets in
>> libzmq and Worker related objects in my application.
>>
>> I've googled around and found a few libzmq features that would suit my
>> needs:
>> - ZMQ_IDENTITY_FD - this was introduced and shortly removed from the lib
>> - ZMQ_SRCFD - deprecated, but it's exactly what I need!
>> - "Peer-Address" metadata, the recommended replacement for ZMQ_SRCFD, but
>> not suitable for my needs
>>
>> I know fd's should be handled with care (monitor events are asynchronous,
>> fd's get reused), but ZMQ_SRCFD solves my problem with the following
>> ruleset:
>> 1. When a Worker registers (first message over a connection) save the
>> underlying fd
>> - and -
>> 2. Check that this fd is in use by another Worker, if it is: that Worker
>> is dead since libzmq reused its file descriptor
>>
>> 3. If a Worker's fd is in closed state for a longer period (heartbeat
>> expiry time), then it crashed and the fd was not re-used (get this info
>> from monitor)
>>
>> I don't know if this is considered as an ugly hack by hardcore zeromq
>> users, but it looks like a legitimate ZMQ_SRCFD use-case to me. It would be
>> nice if it wasn't removed in the upcoming versions.
>> Any feedback welcome!
>>
>> Regards,
>>   Gyorgy
>>
>>
>>
>> On Mon, Feb 13, 2017 at 10:21 PM, Greg Young <gregoryyou...@gmail.com>
>> wrote:
>>
>>> I believe the term here is application level heartbeats.
>>>
>>> It should also be supported that clients can heartbeat to server. It
>>> is not always that all clients want similar heartbeat timeouts.
>>>
>>> On Mon, Feb 13, 2017 at 4:07 PM, Michal Vyskocil
>>> <michal.vysko...@gmail.com> wrote:
>>> > Hi,
>>> >
>>> > You can take inspiration from malamute broker
>>> > https://github.com/zeromq/malamute
>>> >
>>> > There clients pings server regularly. The same does MQTT (just it's a
>>> > server, who pings clients).
>>> >
>>> > Sadly malamute is vulnerable to the same problem, that received service
>>> > request may get lost. Solution would be to let client to send a request
>>> > again after timeout, however wasn't yet implemented.
>>> >
>>> > ___
>>> > zeromq-dev mailing list
>>> > zeromq-dev@lists.zeromq.org
>>> > https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>
>>>
>>>
>>> --
>>> Studying for the Turing test
>>> ___
>>> zeromq-dev mailing list
>>> zeromq-dev@lists.zeromq.org
>>> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>
>>
>>
>> ___
>> zeromq-dev mailing list
>> zeromq-dev@lists.zeromq.org
>> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>
>
> ___
> zeromq-dev mailing list
> zeromq-dev@lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev

Re: [zeromq-dev] router socket hangs on write (was detecting dead MDP workers)

2017-02-17 Thread Gyorgy Szekely
Hi,
Sorry for spamming the list :( I will rate limit myself.

I reviewed the docs for ZMQ_ROUTER_MANDATORY and it's clear now that the
router socket may block if the message can be routed but HWM is reached and
ZMQ_DONTWAIT is not specified. This is the exact code path my application
blocks in.

The problem is that HWM is not reached in my case. zmq::router_t::xsend()
checks HWM with zmq::pipe_t::check_write(), which returns false, but not
because HWM is reached, but beacuse pipe state is
zmq::pipe_t::waiting_for_delimiter.

Summary:
I don't think it's reasonable for zmq::router_t::xsend() to return -1
EAGAIN, when the corresponding pipe is being terminated. It's obvious that
the message can't be sent in the future, there's no point in retrying.

(For the time being, as a workaround I specify ZMQ_DONTWAIT on the send,
and I consider the worker dead with either EHOTUNREACH or EAGAIN.)

What's your opinion on this?


Regards,
  Gyorgy

On Thu, Feb 16, 2017 at 10:44 PM, Gyorgy Szekely <hodito...@gmail.com>
wrote:

> Hi,
> I dug a bit deeper, here are my findings:
> - removing the on/off switching for the ZMQ_ROUTER_MANDATORY flag, and
> enabling it before the router socket bind: makes no difference
> - removing the monitor trigger and heartbeating the workers periodically
> (2.5 sec) drastically reduces the occurrence rate, the program hangs after
> 3-4 hours, instead of seconds. (in the background a worker
> connects/disconnects with 4 second period time)
>
> From this I suspect the issue appears in a small timeframe which is close
> to the monitor event, but otherwise hard to hit.
>
> With GDB is see the following:
> - in zmq::socket_base_t::send() the call to xsend() returns EAGAIN. This
> should not happen since the ZMQ_DONTWAIT is not specified.
> - ZMQ_DONTWAIT is not specified, so the function won't return -1, but
> block (see trace in prev mail).
>
> - inside zmq::router_t::xsend() the pipe is found in the outpipes map, but
> the check_write() on it returns false
> - the if(mandatory) check in this block (router.cpp:218) returns with -1,
> EAGAIN
> - a similar block 10 lines below returns with -1, EHOSTUNREACH
>
> Should both if(mandatory) checks return EHOSTUNREACH? There's also a
> comment in the header for bool mandatory, that it will report EAGAIN, but
> this contradicts with the documentation.
>
> Can you help to clarify?
>
>
> Regards,
>   Gyorgy
>
>
> It
>
> On Thu, Feb 16, 2017 at 12:22 PM, Gyorgy Szekely <hodito...@gmail.com>
> wrote:
>
>> Hi,
>> Continuing my journey on detecting dead workers I reduced the design to
>> the minimal, and eliminated the messy file descriptors.
>> I only have:
>> - a router socket, with some number of peers
>> - a monitor socket attached to the router socket
>>
>> When the monitor detects a disconnect on the router socket:
>> - do setsockopt(ZMQ_ROUTER_MANDATORY, 1);
>> - send heartbeat message to every known peer
>> - if EHOSTUNREACH returned: remove the peer
>> - do setsockopt(ZMQ_ROUTER_MANDATORY, 0);
>>
>> What happens: _my app regularly hangs_ in zmq_msg_send(). Roughly 20% of
>> the invocations. The call never returns, I have to kill the application.
>>
>> What am I doing wrong??? According to the RFC's router sockets should
>> never block.
>> I attached a full stacktrace with info locals and args for each relevant
>> frame (sorry for the machine readable format).
>>
>> Env:
>> libzmq 4.2.1 stable, debug build
>> Ubuntu 16.04 64bit (the same happens with ubuntu packaged lib)
>>
>> Regards,
>>   Gyorgy
>>
>>
>
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev

[zeromq-dev] pub/sub pattern

2017-12-05 Thread Gyorgy Szekely
Hi ZeroMQ community,
In our application we use ZeroMQ for communication between backend services
and it works quite well (thanks for the awesome library). Up to now we
relied on the request/reply pattern only (a majordomo derivative protocol),
where a broker distributes tasks among workers. Everything runs in it's own
container, and scaling works like charm: if workers can't keep up with the
load, we can simply start some more, and the protocol handles the rest. So
far so good.

Now, we would like to use pub/sub: a component produces some data, and
publishes an event about it. It doesn't care (and potentially can't even
know) who needs it. Interested peers subscribe to the topic. What I'm
puzzled with is scaling. If a subscriber can't keep up with the load I
would like to scale it up just like the workers. But in this case the
events won't be distributed, but all instances receive the same set,
increasing CPU load, but not throughput.

I would like a pub/sub where load is distributed among identical instances.
ZeroMQ has all kinds of fancy patterns (pirates and stuff), is there
something for this problem?

What I had in mind is equipping subscribers with a "groupId", which is the
same in scaling groups. Subscribers send their id's on connection to the
broker, which publishes the topics to only one subscriber in each group.
This means I can't use pub/sub sockets, but I have to reimplement the
behavior on router/dealer, but that's ok.

What do you think, is there a better way?

Regards,
  Gyorgy
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev


Re: [zeromq-dev] pub/sub pattern

2017-12-07 Thread Gyorgy Szekely
Thanks guys for the answers. For some reason I didn't consider push-pull in
the first place, but it suits my need quite well. :)

My only concern is about recovery from failures. Let's pretend I create a
pipeline with 3 stages, and 2 processing nodes for each stage. Now, one of
the nodes in the middle stage fail, making the pipeline lose half of the
processed messages. What are my options to handle such scenarios? I glanced
over The guide, but didn't find anything relevant...

What kind of failures I want to handle?
- the business logic in a processing node fails (because of a problem with
the input data), but the node does not crash. In this case I'm thinking of
pushing forward an error message instead of a result. This way the sink
would detect the failure immediately and could act upon it. This case is ok.
- the node crashes while processing and doesn't push anything forward. How
should this be handled? I can think of a timeout in the sink, but is there
something better?


Regards,
  Gyorgy

On Wed, Dec 6, 2017 at 3:32 AM, Stephen Riesenberg <
stephen.riesenb...@gmail.com> wrote:

> It obviously gets complicated in a hurry, but to add to that:
>
> Can you create a broker per scaling group, which subscribes to the
> relevant topic and uses push sockets to load balance automagically to
> workers? It shouldn’t get overloaded if it’s just shuffling messages though
> it’s always possible, and that’s when you may need to switch away from
> pub/sub and use credit based flow or something.
>
> Or possibly you could even use a majordomo broker per group? You’d need to
> manage subscribers yourself probably like you suggested with router/dealer.
> This seems more complicated. Benefit is you could possibly reuse everything
> you’ve already built and just add new components to manage the pub/sub
> aspect to extend the network. If you’re using containers you’d just need to
> level up your automation to handle the added complexity of more networking
> configs, groups/clusters and the like.
>
> Or I wonder if you could combine majordomo brokers with zyre groups and
> use SHOUT to publish to your brokers (or a proxy for the broker). A
> subscription is a zyre group membership. A publisher shouts at that zyre
> group. The proxy, a zyre client, forwards messages to the actual broker (on
> localhost) who load balances for a scaling group.
>
> This all depends on volume and failure scenarios too.
>
> On Tue, Dec 5, 2017 at 5:38 AM Luca Boccassi <luca.bocca...@gmail.com>
> wrote:
>
>> On Tue, 2017-12-05 at 09:03 +0100, Gyorgy Szekely wrote:
>> > Hi ZeroMQ community,
>> > In our application we use ZeroMQ for communication between backend
>> > services
>> > and it works quite well (thanks for the awesome library). Up to now
>> > we
>> > relied on the request/reply pattern only (a majordomo derivative
>> > protocol),
>> > where a broker distributes tasks among workers. Everything runs in
>> > it's own
>> > container, and scaling works like charm: if workers can't keep up
>> > with the
>> > load, we can simply start some more, and the protocol handles the
>> > rest. So
>> > far so good.
>> >
>> > Now, we would like to use pub/sub: a component produces some data,
>> > and
>> > publishes an event about it. It doesn't care (and potentially can't
>> > even
>> > know) who needs it. Interested peers subscribe to the topic. What I'm
>> > puzzled with is scaling. If a subscriber can't keep up with the load
>> > I
>> > would like to scale it up just like the workers. But in this case the
>> > events won't be distributed, but all instances receive the same set,
>> > increasing CPU load, but not throughput.
>> >
>> > I would like a pub/sub where load is distributed among identical
>> > instances.
>> > ZeroMQ has all kinds of fancy patterns (pirates and stuff), is there
>> > something for this problem?
>> >
>> > What I had in mind is equipping subscribers with a "groupId", which
>> > is the
>> > same in scaling groups. Subscribers send their id's on connection to
>> > the
>> > broker, which publishes the topics to only one subscriber in each
>> > group.
>> > This means I can't use pub/sub sockets, but I have to reimplement the
>> > behavior on router/dealer, but that's ok.
>> >
>> > What do you think, is there a better way?
>> >
>> > Regards,
>> >   Gyorgy
>>
>> Sounds like what you want is similar to push-pull - load balancing is
>> embedded in that pattern, have a look at the docs
>>
>> --
>> Kind regards,
>> Luca Boccassi___
>> zeromq-dev mailing list
>> zeromq-dev@lists.zeromq.org
>> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>
>
> ___
> zeromq-dev mailing list
> zeromq-dev@lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev


Re: [zeromq-dev] pub/sub pattern

2017-12-09 Thread Gyorgy Szekely
Thanks! I'll have a look.

On Sat, Dec 9, 2017 at 5:03 PM, Stephen Riesenberg <
stephen.riesenb...@gmail.com> wrote:

> Have you seen Apache Storm before? It solves the problems your describing.
> It used to use zeromq as the transport and now is pluggable (I believe).
>
> On Thu, Dec 7, 2017 at 3:32 PM Gyorgy Szekely <hodito...@gmail.com> wrote:
>
>> Thanks guys for the answers. For some reason I didn't consider push-pull
>> in the first place, but it suits my need quite well. :)
>>
>> My only concern is about recovery from failures. Let's pretend I create a
>> pipeline with 3 stages, and 2 processing nodes for each stage. Now, one of
>> the nodes in the middle stage fail, making the pipeline lose half of the
>> processed messages. What are my options to handle such scenarios? I glanced
>> over The guide, but didn't find anything relevant...
>>
>> What kind of failures I want to handle?
>> - the business logic in a processing node fails (because of a problem
>> with the input data), but the node does not crash. In this case I'm
>> thinking of pushing forward an error message instead of a result. This way
>> the sink would detect the failure immediately and could act upon it. This
>> case is ok.
>> - the node crashes while processing and doesn't push anything forward.
>> How should this be handled? I can think of a timeout in the sink, but is
>> there something better?
>>
>>
>> Regards,
>>   Gyorgy
>>
>> On Wed, Dec 6, 2017 at 3:32 AM, Stephen Riesenberg <
>> stephen.riesenb...@gmail.com> wrote:
>>
>>> It obviously gets complicated in a hurry, but to add to that:
>>>
>>> Can you create a broker per scaling group, which subscribes to the
>>> relevant topic and uses push sockets to load balance automagically to
>>> workers? It shouldn’t get overloaded if it’s just shuffling messages though
>>> it’s always possible, and that’s when you may need to switch away from
>>> pub/sub and use credit based flow or something.
>>>
>>> Or possibly you could even use a majordomo broker per group? You’d need
>>> to manage subscribers yourself probably like you suggested with
>>> router/dealer. This seems more complicated. Benefit is you could possibly
>>> reuse everything you’ve already built and just add new components to manage
>>> the pub/sub aspect to extend the network. If you’re using containers you’d
>>> just need to level up your automation to handle the added complexity of
>>> more networking configs, groups/clusters and the like.
>>>
>>> Or I wonder if you could combine majordomo brokers with zyre groups and
>>> use SHOUT to publish to your brokers (or a proxy for the broker). A
>>> subscription is a zyre group membership. A publisher shouts at that zyre
>>> group. The proxy, a zyre client, forwards messages to the actual broker (on
>>> localhost) who load balances for a scaling group.
>>>
>>> This all depends on volume and failure scenarios too.
>>>
>>> On Tue, Dec 5, 2017 at 5:38 AM Luca Boccassi <luca.bocca...@gmail.com>
>>> wrote:
>>>
>>>> On Tue, 2017-12-05 at 09:03 +0100, Gyorgy Szekely wrote:
>>>> > Hi ZeroMQ community,
>>>> > In our application we use ZeroMQ for communication between backend
>>>> > services
>>>> > and it works quite well (thanks for the awesome library). Up to now
>>>> > we
>>>> > relied on the request/reply pattern only (a majordomo derivative
>>>> > protocol),
>>>> > where a broker distributes tasks among workers. Everything runs in
>>>> > it's own
>>>> > container, and scaling works like charm: if workers can't keep up
>>>> > with the
>>>> > load, we can simply start some more, and the protocol handles the
>>>> > rest. So
>>>> > far so good.
>>>> >
>>>> > Now, we would like to use pub/sub: a component produces some data,
>>>> > and
>>>> > publishes an event about it. It doesn't care (and potentially can't
>>>> > even
>>>> > know) who needs it. Interested peers subscribe to the topic. What I'm
>>>> > puzzled with is scaling. If a subscriber can't keep up with the load
>>>> > I
>>>> > would like to scale it up just like the workers. But in this case the
>>>> > events won't be distributed, but all instances receive the same set,
>>>> > increasing CPU load, but not throughp

Re: [zeromq-dev] cppzmq revival and RFC on design goals and supported platforms

2018-05-23 Thread Gyorgy Szekely
Hi Simon,
This is great news! We're using cppzmq in a message broker and an
accompanying communication library for 2 years now.

I fully agree with the declared goals. libzmq has a simple and concise API
with object oriented mindset. It works well on its own, but cppzmq makes it
a whole lot easier. What's particularly good about it:
- type safety and RAII: it's very straigtforward to think in classes that
properly clean-up resources at destruction
- higher level functions: multipart messages are really nice, though the
API is/was a bit inconsistent (socket.send(msg) vs, msg.send(socket))
- header only, it's very easy to use. Header only libraries usually mean
template heavy monsters, but fortunately not in this case

What I personally really like is it's a thin wrapper and doesn't want to be
more than libzmq. Methods usually map 1-to-1 to libzmq calls, there's no
hidden trickery and the documentation at api.zeromq.org is fully relevant.

I haven't checked the recent updates (yet), but I found a few strange bits
while working with cppzmq. Like the above mentioned sending inconsistency,
or having to cast the socket to void* to use it in a pollset. Apart from
that I completely agree with the direction. This is how a thin C++ wrapper
should look like for a good base C API.

BTW, we're using the lib on Ubuntu16.04 64bit / G++ 5.3, no issues so far.

Regards,
  Gyorgy

On Wed, May 23, 2018 at 6:07 PM,  wrote:

> Hi,
>
> Pawel Kurdybacha (kurdybacha) and me (sigiesec) have recently started to
> "revive" cppzmq (https://github.com/zeromq/cppzmq), the light-weight C++
> wrapper around libzmq. We added CI for Windows/MSVC, Linux and MacOS,
> implemented tests, cleaned up the CMake infrastructure, formatted the
> source code consistently and added some overview documentation.
>
> If you are using cppzmq or are interested in using it, we encourage you to
> have a look at the recent changes.
>
> One particular point we would like to seek feedback on are the design
> goals, which have recently been documented for the first time. I tried to
> extrapolate them from the actual design, and from the reasons we chose to
> use cppzmq in comparison to other alternatives. These are part of the
> https://github.com/zeromq/cppzmq/blob/master/README.md file:
>
> * cppzmq maps the libzmq C API to C++ concepts. In particular:
>* it is type-safe (the libzmq C API exposes various class-like concepts
> as void*)
>* it provides exception-based error handling (the libzmq C API provides
> errno-based error handling)
>* it provides RAII-style classes that automate resource management (the
> libzmq C API requires the user to take care to free resources explicitly)
> * cppzmq is a light-weight, header-only binding. You only need to include
> the header file zmq.hpp (and maybe zmq_addon.hpp) to use it.
> * zmq.hpp is meant to contain direct mappings of the abstractions provided
> by the libzmq C API, while zmq_addon.hpp provides additional higher-level
> abstractions.
>
> We would like to here from you if you agree with these design goals. If
> you have any opposing views, proposals for improvement or extension of the
> design goals, please share them on the mailing list or by sending a PR.
>
> Another part of the README is a section on the supported platforms. Please
> review this section, in particular if you do not use MacOS, Linux or
> Windows/MSVC with a recent compiler. If you successfully use a different
> platform, please send a PR to include this in the list of "Additional
> platforms that are known to work". Support for non-C++11 compilers is
> already partial only, and might be removed completely, unless there are
> users that still require such support.
>
> Of course, you are also invited to contribute extensions, new features,
> cleanup, further tests, etc. to cppzmq.
>
> Best regards
> Simon
>
> --
> i.A. Simon Giesecke
> BTC Business Technology Consulting AG
> Kurfürstendamm 33
> 10719 Berlin
> E-Mail: simon.giese...@btc-ag.com
>
> Rechtliche Hinweise:
> www.btc-ag.com/impressum.htm
> Handelsregister: Amtsgericht Oldenburg HRB 4717
> Aufsichtsratsvorsitzender: Michael Heidkamp
> Vorstand: Dr. Jörg Ritter, Dirk Thole
>
> ___
> zeromq-dev mailing list
> zeromq-dev@lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev


Re: [zeromq-dev] Slow joiner syndrome solution for XSUB/XPUB broker based system

2018-05-28 Thread Gyorgy Szekely
Hi Tomer
As far as I know the message from the publisher will reach the broker.
According to the docs, the PUB socket drops messages in mute-state (HWM
reached), and it's not the case here. The message will be sent as soon as
the connection is established, and the socket termination blocks until the
send is complete. Unless you set linger to zero.

The slow joiner problem means that subscriptions may not be active by the
time the publisher send the message. Either because the subscriber is not
yet running, or because the subscribe calls themselves are asynchronous (by
the time setsockopt(SUNSCRIBE) returns the broker is not yet aware of
this). The zmq guide shows mitigations for this problem in the Advanced
Publish Subscribe chapter.

Regards,
  Gyorgy

On Mon, May 28, 2018 at 11:06 AM, Tomer Eliyahu 
wrote:

> Hi,
>
>
>
> I know this topic was probably discussed before, I couldn't find a proper
> solution, so I implemented something a bit different. I'm not sure if this
> solves all pitfalls, i'll be greatfull for comments.
>
>
>
> We have a system with a XPUB-XSUB broker running as a separate process in
> the system (binds frontend to ipc:///tmp/publishers  and backend to
> ipc:///tmp/subscribers).
>
>
>
> Clients of the broker have both SUB socket for receiving messages, and a
> PUB socket for sending messages. When a client boots, it connects both its
> PUB and SUB sockets to the broker's endpoints, and subscribes to the topic
> of interest.
>
>
> For the sake of simplicity, lets assume there we have only the broker, a
> publisher and a subscriber processes in the system:
>
> We make sure that the broker process starts first, then a subscriber which
> connects and subscribes to the topic, and only then start the publisher.
> The publisher then sends a single message and terminates.
>
> Obviously, the message is lost due to the slow joiner syndrome - I assume
> the reason for that is because the publisher process zmq_connect() call is
> asynchronous, therefore the connect is not actually complete by the time we
> send the message.
>
>
>
> I thought of a possible solution for this - basically we want to
> synchronize the connect operation done by the publisher. Having both PUB
> and SUB socket, we can simply send a dummy message from PUB to SUB on the
> same publisher process until the first message is receieved, and then it is
> guarantied that the connect is done and consecutive messages (now to "real"
> topics with actual subscribers) will not be lost.
>
>
>
> The only part i'm not sure about is the subscriber side - assuming the
> subscriber boots, connects and subscribes _before_ we start the publisher -
> is it guarantied that no message will be lost (assuming ofcourse the
> subscriber doesn't crash / unsubscribe / etc.) ?
>
>
>
> Thanks,
>
> Tomer
>
>
> ___
> zeromq-dev mailing list
> zeromq-dev@lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev


[zeromq-dev] Linger vs inproc

2018-04-05 Thread Gyorgy Szekely
Hi all,
I would like to clarify the effect of the linger setting on inproc
transports.

Background:
I have a message broker that has a stateful routing thread and a number of
front-end threads that manage websocket sessions. The routing and websocket
threads communicate over zmq inproc transport with router-dealer sockets
(respectively). The routing is stateful and must be notified of the
websocket session closure, this is done with a message
(websocket->routing). The websocket session closure also closes the
corresponding zmq dealer socket.

Question:
So I have a dealer socket connected to a router over inproc, and I close
this socket immediately after sending a message. I want to be confident in
the following 2 statements:
- the message before the close never gets dropped
- the socket close never blocks for a considerable amount of time (let's
say more than 1ms)
(The router socket is permanent)

Does libzmq guarantee this? How should I set the linger time?

The API docs state that inproc transport can run with a thread-less
context, this made me think that sends on inproc are practically
synchronous, and by the time send returns, the message is in the receiving
socket's input queue. Is this true?

Regards,
  Gyorgy Szekely
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev


[zeromq-dev] Migrate app to threadsafe sockets

2019-11-30 Thread Gyorgy Szekely
Hi all,
I'm planning to migrate a Majordomo like broker from ROUTER/DEALER sockets,
to the threadsafe CLIENT/SERVER ones. Right now the protocol uses multipart
messages (similar to zmq rfc 7) where the broker only pops off the first 2
frames of the message (identity, routing_info), and routes the message
(passes back to libzmq) without ever touching the payload frames.

As I understand threadsafe sockets don't support multipart messages. The
routing_info part of the message has variable size and usually it grows as
a message is passed on. I don't want to reallocate/copy the whole message
to accommodate for a larger routing_info, that would hurt performance. Is
there a common practice to solve this a problem?

I have 2 things in mind:
- Use a fixed buffer size for routing_info that's large enough to hold all
possible sizes. Unfortunately some fields of the routing_info are user
defined with arbitrary size, so finding the right size is not trivial,
moreover I feel that this approach is just too rigid.
- Use some scatter/gather API. api.zeromq.org states that there's intention
to have such API for threadsafe sockets, does it already exist? As I see in
zmq.h there are some deprecated iovector based functions, but I don't think
it's good idea to start building new code on these... (do they work at all
with threadsafe sockets?)

Any other idea, comments?

Regards,
  Gyorgy Szekely
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev


[zeromq-dev] Flow control with router socket

2020-05-14 Thread Gyorgy Szekely
Hi All,

I have a message router that routes messages between different types of
producers and consumers. Socket types are router (message router), dealer
(producer, consumer). Currently if a consumer is not fast enough messages
start to queue up (dealer input queue, router output queue), eventually the
router starts to drop messages to avoid blocking.

I would like to implement some kind of flow control. With plain TCP I can
rely on the built-in flow control: write blocks if consumer is overloaded,
and producers can be sanctioned by not reading their socket.

With ZMQ router/dealer I can detect  if a consumer is slow by receiving
EAGAIN on send, but as far as I understand I can't "slow down" a specific
producer, because router socket does fair queuing. So I have to do
application layer "stop sending" and "continue" messages and send them to
specific producers...

Is there any better way to do this? I would rather not reinvent the wheel,
TCP already has a sophisticated mechanism for this.

Regards,
   Gyorgy Szekely
___
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev


Re: [zeromq-dev] Flow control with router socket

2020-05-14 Thread Gyorgy Szekely
Thank you guys for the answers!
That's plenty of information to get me going.

Regards,
   Gyorgy Szekely

On Thu, May 14, 2020 at 6:13 PM Brett Viren  wrote:

> I was going to reply similarly.  Once a message is outside a socket,
> it's out of ZeroMQ's hands and the responsibility of the application.
>
> This aspect is somewhat fresh in my own learning.  The core issue here
> (in my mind) is that the broker (router) severs any direct communication
> between producer and consumer.  Thus any back pressure from consumer to
> producer must be communicated through the broker application.  It may
> sound obvious to say it like this but it took me a bit to internalize
> this basic idea.
>
> If it's any help to look at some examples, I've recently implemented a
> form of credit-based flow control following the ZeroMQ guide
> description.  It adds a runtime choice to use ROUTER/DEALER or
> SERVER/CLIENT.  The implementation is somewhat enmeshed in other choices
> in my library but it could still serve as a general example.
>
> Actually, I implmented it twice, once in PyZMQ and once in cppzmq.
> Barring some bug, the two should inter-operate.
>
> PyZMQ:
>
>   https://github.com/brettviren/zio/tree/master/python/zio/flow
>
> cppzmq:
>
>   https://github.com/brettviren/zio/blob/master/inc/zio/flow.hpp
>   https://github.com/brettviren/zio/blob/master/src/flow.cpp
>
> (flow.cpp is kind of a mess due to me still trying to understand
> boost.sml which I use for the FSM.  I welcome any critiques if anyone
> cares to offer them.)
>
> One caveat, this code is Free to use but I don't consider it released.
> I may still end up making drastic changes.
>
> But, to understand (this implementation of) credit-based flow control
> this documentation and diagram may be more useful than the code:
>
>   https://brettviren.github.io/zio/flow.html
>
>
> And, one last comment: The Python implementation includes a "broker"
> that does a little dance in order to hook up the "producer" and the
> "consumer" ends in a way that allows each to think they are simple
> clients and thus they can reuse the same "flow protocol" code in a
> symmetric manner regadless of which role each endpoint plays.  I have
> some rambling about how the broker does this dance:
>
>   https://brettviren.github.io/zio/flow-broker.html
>
> There are likely other, and simpler ways to do this.  I'd be interested
> to hear any ideas on that.  The main issue is the broker needs to spawn
> a "back end" consumer (producer) to service the "front end" producer
> (consumer).  The broker has no a'priori info about its front end clients
> and what they may want from a back end so this spawning must be done as
> a dymamic reaction to the first message from a "front end".  That and my
> desire for symmetric "flow protocol" code leads to this somewhat
> contorted dance.
>
>
> Cheers,
> -Brett.
>
>
> Kevin Sapper  writes:
>
> > Hi Gyorgy,
> >
> > back-pressure is something very specific to your application. ZeroMQ
> itself only implements blocking or dropping depending on the socket types
> > you're using.
> >
> > To implement a credit-based flow have a look into the guide:
> http://zguide.zeromq.org/page:all#Transferring-Files
> >
> > //Kevin
> >
> > Am Do., 14. Mai 2020 um 09:55 Uhr schrieb Gyorgy Szekely <
> hodito...@gmail.com>:
> >
> >  Hi All,
> >
> >  I have a message router that routes messages between different types of
> producers and consumers. Socket types are router (message
> >  router), dealer (producer, consumer). Currently if a consumer is not
> fast enough messages start to queue up (dealer input queue, router
> >  output queue), eventually the router starts to drop messages to avoid
> blocking.
> >
> >  I would like to implement some kind of flow control. With plain TCP I
> can rely on the built-in flow control: write blocks if consumer is
> >  overloaded, and producers can be sanctioned by not reading their socket.
> >
> >  With ZMQ router/dealer I can detect  if a consumer is slow by receiving
> EAGAIN on send, but as far as I understand I can't "slow down" a
> >  specific producer, because router socket does fair queuing. So I have
> to do application layer "stop sending" and "continue" messages and
> >  send them to specific producers...
> >
> >  Is there any better way to do this? I would rather not reinvent the
> wheel, TCP already has a sophisticated mechanism for this.
> >
> >  Regards,
> > Gyorgy Szekel