Hello Kevin,

One of main ZMQ features is providing an alternative approach for dealing
with concurrency. If you use ZMQ properly, there should be no data sharing
between threads, so you don't need to use any lock-free thread-safe data
structures.

For your particular task, consider using two threads: provider (P) and
Consumer (C). The threads communicate over REQ-REP and P has additional SUB
socket for reception of broadcast messages. Something like this:

--> SUB --> P --> REP <--> REQ <--> C

At the startup P subscribes for PUB messages and starts to poll on both
sockets. C sends to P request for a message and polls for reply. P receives
request from C, but since there are no messages to process it sets a flag
indicating that C is waiting and goes back to polling. Both threads are
sleeping at this point, no CPU consumption.

Upon receiving message on SUB P adds it to a queue of incoming messages
using any logic you deem necessary. After that it checks if C is waiting,
and forwards head of the queue to C. C handles the message and sends next
request to P. If the queue is not empty, P forwards head of the queue to C,
otherwise stets the flag.

This way only P accesses the queue, so it does not have to be thread safe.
C can see only one message at a time, and is either handling that message
or is sleeping waiting for P reply.

If incoming traffic is beyond what C can handle, P will know that C is
still busy (flag is not set) and can drop some messages form the queue.

Hope this helps.


Regards,
Mykola


2017-12-19 2:02 GMT+02:00 Kevin Wang <[email protected]>:

> Hello everyone,
>
> I have run into a design problem when making a subscriber class with a
> ring buffer using CZMQ.
>
> CONTEXT:
>
> The class contains a zactor, which then contains a zloop. The zloop has a
> reader for the sub socket, and on receiving a message, adds the message to
> a ring buffer, using a lock-free mechanism. This moves the "head" atomic
> int.
>
> (The built in queue for zmq does not work for me. I need the oldest
> messages to be tossed when the queue is full but ZMQ tosses all newest
> messages)
>
> The user has the ability to consume data from the buffer, which moves the
> "tail" atomic int.
>
> PROBLEM:
>
> I need a function that returns the number of available messages in the
> ring buffer, but with a timeout. Ideally, the user has a while loop that
> keeps checking the number of available messages, but with a timeout so it
> does not burn CPU usage.
>
> I have tried 2 things:
>
> 1. zmq_poll and its timeout functionality: it seems that when I call
> zmq_poll, it interferes with the zloop's polling and doesn't allow it to
> process the messages properly, so i can't poll on the sub socket on the
> user's thread.
>
> 2. timers in zloop: I have tried to create a zloop timer when the user
> asks for available_messages(), which will either timeout and return 0 or
> cancel when a message comes in first and return # of messages. The problem
> is that at 2048byte message at 30,000/sec, adding a timer slows down things
> considerably.
>
>
> I am considering using some kind of proxy, and the capture socket, but
> other than that, am out of ideas.
>
> Any thoughts? Having a completely blocking function is not possible, and
> having it return immediately is not either.
>
> Thanks to anyone that can help,
>
> Kevin
>
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
_______________________________________________
zeromq-dev mailing list
[email protected]
https://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to