Thanks Mykola, I like your proposed solution, it works great! It does cut out all the shared variables and deals with it better than using atomic ints. I have placed the SUB socket in a zactor/zloop combination, and am using the built in PAIR sockets instead of REQ/REP to communicate the requests and pass the data from the queue to the user.

However, I am having a problem with destroying this class. There is some race condition that I can't find between destroying the sockets and the user in the middle of a zmq_poll() call.

What would be the order of destroying the objects?

Kevin


On 12/18/2017 11:25 PM, Mykola Ostrovskyy via zeromq-dev wrote:
Hello Kevin,

One of main ZMQ features is providing an alternative approach for dealing with concurrency. If you use ZMQ properly, there should be no data sharing between threads, so you don't need to use any lock-free thread-safe data structures.

For your particular task, consider using two threads: provider (P) and Consumer (C). The threads communicate over REQ-REP and P has additional SUB socket for reception of broadcast messages. Something like this:

--> SUB --> P --> REP <--> REQ <--> C

At the startup P subscribes for PUB messages and starts to poll on both sockets. C sends to P request for a message and polls for reply. P receives request from C, but since there are no messages to process it sets a flag indicating that C is waiting and goes back to polling. Both threads are sleeping at this point, no CPU consumption.

Upon receiving message on SUB P adds it to a queue of incoming messages using any logic you deem necessary. After that it checks if C is waiting, and forwards head of the queue to C. C handles the message and sends next request to P. If the queue is not empty, P forwards head of the queue to C, otherwise stets the flag.

This way only P accesses the queue, so it does not have to be thread safe. C can see only one message at a time, and is either handling that message or is sleeping waiting for P reply.

If incoming traffic is beyond what C can handle, P will know that C is still busy (flag is not set) and can drop some messages form the queue.

Hope this helps.


Regards,
Mykola


2017-12-19 2:02 GMT+02:00 Kevin Wang <[email protected] <mailto:[email protected]>>:

    Hello everyone,

    I have run into a design problem when making a subscriber class
    with a ring buffer using CZMQ.

    CONTEXT:

    The class contains a zactor, which then contains a zloop. The
    zloop has a reader for the sub socket, and on receiving a message,
    adds the message to a ring buffer, using a lock-free mechanism.
    This moves the "head" atomic int.

    (The built in queue for zmq does not work for me. I need the
    oldest messages to be tossed when the queue is full but ZMQ tosses
    all newest messages)

    The user has the ability to consume data from the buffer, which
    moves the "tail" atomic int.

    PROBLEM:

    I need a function that returns the number of available messages in
    the ring buffer, but with a timeout. Ideally, the user has a while
    loop that keeps checking the number of available messages, but
    with a timeout so it does not burn CPU usage.

    I have tried 2 things:

    1. zmq_poll and its timeout functionality: it seems that when I
    call zmq_poll, it interferes with the zloop's polling and doesn't
    allow it to process the messages properly, so i can't poll on the
    sub socket on the user's thread.

    2. timers in zloop: I have tried to create a zloop timer when the
    user asks for available_messages(), which will either timeout and
    return 0 or cancel when a message comes in first and return # of
    messages. The problem is that at 2048byte message at 30,000/sec,
    adding a timer slows down things considerably.


    I am considering using some kind of proxy, and the capture socket,
    but other than that, am out of ideas.

    Any thoughts? Having a completely blocking function is not
    possible, and having it return immediately is not either.

    Thanks to anyone that can help,

    Kevin

    _______________________________________________
    zeromq-dev mailing list
    [email protected] <mailto:[email protected]>
    https://lists.zeromq.org/mailman/listinfo/zeromq-dev
    <https://lists.zeromq.org/mailman/listinfo/zeromq-dev>




_______________________________________________
zeromq-dev mailing list
[email protected]
https://lists.zeromq.org/mailman/listinfo/zeromq-dev

_______________________________________________
zeromq-dev mailing list
[email protected]
https://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to