Hi Francesco,

I implemented such an algorithm in C++ which I call "zipper".

The idea is simply to maintain a min-heap priority queue keyed on the
timestamp and surround that with policy logic to decide when to push
and pop based on examining the system clock.  I've implemented two
policies.  Either a maximum latency bound is asserted at the cost of
possible message loss or the merge is lossless at the risk of unbound
latency.

It is a rather simple pattern and this description alone may be enough
to implement it yourself but you may also take a look at this repo
with code, performance results and other docs.

https://github.com/brettviren/zipper

Though I failed to make it explicit, this code may be considered
licensed under the LGPL.  Let me know if you wish to use the code and
I'll add proper license info.

The zipper.hpp implementation is in terms of C++ data objects and
independent from zeromq per se (only needs C++ standard library).
But, it was written with the assumption that it would be sandwiched
between ZeroMQ input and output sockets.  Providing a layer to marshal
data in to / out from the zipper is then the duty of the application.

Note, my repo was for development purposes.  The zipper.hpp file was
then copied into a production repository and that copy may have some
bug fixes which I have not ported back to the stand-alone development
version.  The production version is here:

https://github.com/DUNE-DAQ/trigger/blob/develop/plugins/zipper.hpp

-Brett.

On Wed, Nov 9, 2022 at 5:20 PM Francesco <francesco.monto...@gmail.com> wrote:
>
> Hi all,
>
> I have written two applications using ZMQ PUB-SUB pattern (over TCP 
> transport).
> The subscriber application has its SUB socket connected to multiple PUBs 
> (multiple tcp endpoints). Each message sent by the PUB encodes the timestamp 
> (as obtained from clock_gettime() syscall at TX side using monotonically 
> increasing clock) of the event described by the ZMQ message.
>
> The subscriber needs to process the data stream _strictly_ in order. However 
> the multiple publishers have no coordination and they will emit messages at 
> different rates, each with its own timestamp. The only guarantee that I have, 
> according to ZMQ docs, is that the SUB socket will perform "fair dequeueing", 
> but that's not enough to guarantee that every zmq_msg_t received from the SUB 
> socket will have a monotonically increasing timestamp: it depends on the 
> filling level of the TCP rx/tx kernel buffers, the zmq HWMs, etc.
>
> For this reason I'm looking for some algorithm that
> * allows me to push zmq_msg_t pulled out of the SUB socket (without strict 
> time ordering)
> * allows me to pull out zmq_msg_t that have a timestamp monotonically 
> increasing
> * introduces a fixed max latency of N msecs (configurable)
>
> Do you have any pointer for such kind of problem?
> Anybody already hit a similar issue?
>
> Thanks for any help,
>
> Francesco Montorsi
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev@lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
_______________________________________________
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to