Kevin,

By now you should get the gist of it: if you have some concurrency problem
-- add more sockets! :)

Not sure what exactly you have run into, but I would not recommend to have
a hard stop on a thread that is sleeping in poll. Instead you should to let
it know that it's time to shutdown, so it can release resources in a
controlled fashion. And luckily we've got a great tool for sending
notifications to threads -- sockets. It might be not the most efficient
way, but I have an extra inproc PAIR socket bound on every thread. That
socket is also added into poll list, and when you receive a message on that
socket you exit the polling loop, close current thread sockets, do any
cleanup necessary and return from thread function.

>From outside you'll have some stop() function that creates a corresponding
PAIR socket, connects to the one bound in the target thread, sends the
shutdown message, closes the socket and joins on the thread, waiting for it
to finish cleanup. In your case that stop() function should be sending
shutdown message to both P and C, of course. You can either go with two
sockets, or get creative and reuse a single PAIR socket to send both
messages (connect to P, send, disconnect, connect to C, etc). And then you
will have to wait for both threads to join.


Regards,
Mykola

2017-12-19 23:27 GMT+02:00 Kevin Wang <[email protected]>:

> Thanks Mykola, I like your proposed solution, it works great! It does cut
> out all the shared variables and deals with it better than using atomic
> ints. I have placed the SUB socket in a zactor/zloop combination, and am
> using the built in PAIR sockets instead of REQ/REP to communicate the
> requests and pass the data from the queue to the user.
>
> However, I am having a problem with destroying this class. There is some
> race condition that I can't find between destroying the sockets and the
> user in the middle of a zmq_poll() call.
>
> What would be the order of destroying the objects?
>
> Kevin
>
> On 12/18/2017 11:25 PM, Mykola Ostrovskyy via zeromq-dev wrote:
>
> Hello Kevin,
>
> One of main ZMQ features is providing an alternative approach for dealing
> with concurrency. If you use ZMQ properly, there should be no data sharing
> between threads, so you don't need to use any lock-free thread-safe data
> structures.
>
> For your particular task, consider using two threads: provider (P) and
> Consumer (C). The threads communicate over REQ-REP and P has additional SUB
> socket for reception of broadcast messages. Something like this:
>
> --> SUB --> P --> REP <--> REQ <--> C
>
> At the startup P subscribes for PUB messages and starts to poll on both
> sockets. C sends to P request for a message and polls for reply. P receives
> request from C, but since there are no messages to process it sets a flag
> indicating that C is waiting and goes back to polling. Both threads are
> sleeping at this point, no CPU consumption.
>
> Upon receiving message on SUB P adds it to a queue of incoming messages
> using any logic you deem necessary. After that it checks if C is waiting,
> and forwards head of the queue to C. C handles the message and sends next
> request to P. If the queue is not empty, P forwards head of the queue to C,
> otherwise stets the flag.
>
> This way only P accesses the queue, so it does not have to be thread safe.
> C can see only one message at a time, and is either handling that message
> or is sleeping waiting for P reply.
>
> If incoming traffic is beyond what C can handle, P will know that C is
> still busy (flag is not set) and can drop some messages form the queue.
>
> Hope this helps.
>
>
> Regards,
> Mykola
>
>
> 2017-12-19 2:02 GMT+02:00 Kevin Wang <[email protected]>:
>
>> Hello everyone,
>>
>> I have run into a design problem when making a subscriber class with a
>> ring buffer using CZMQ.
>>
>> CONTEXT:
>>
>> The class contains a zactor, which then contains a zloop. The zloop has a
>> reader for the sub socket, and on receiving a message, adds the message to
>> a ring buffer, using a lock-free mechanism. This moves the "head" atomic
>> int.
>>
>> (The built in queue for zmq does not work for me. I need the oldest
>> messages to be tossed when the queue is full but ZMQ tosses all newest
>> messages)
>>
>> The user has the ability to consume data from the buffer, which moves the
>> "tail" atomic int.
>>
>> PROBLEM:
>>
>> I need a function that returns the number of available messages in the
>> ring buffer, but with a timeout. Ideally, the user has a while loop that
>> keeps checking the number of available messages, but with a timeout so it
>> does not burn CPU usage.
>>
>> I have tried 2 things:
>>
>> 1. zmq_poll and its timeout functionality: it seems that when I call
>> zmq_poll, it interferes with the zloop's polling and doesn't allow it to
>> process the messages properly, so i can't poll on the sub socket on the
>> user's thread.
>>
>> 2. timers in zloop: I have tried to create a zloop timer when the user
>> asks for available_messages(), which will either timeout and return 0 or
>> cancel when a message comes in first and return # of messages. The problem
>> is that at 2048byte message at 30,000/sec, adding a timer slows down things
>> considerably.
>>
>>
>> I am considering using some kind of proxy, and the capture socket, but
>> other than that, am out of ideas.
>>
>> Any thoughts? Having a completely blocking function is not possible, and
>> having it return immediately is not either.
>>
>> Thanks to anyone that can help,
>>
>> Kevin
>>
>> _______________________________________________
>> zeromq-dev mailing list
>> [email protected]
>> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>
>
>
>
> _______________________________________________
> zeromq-dev mailing 
> [email protected]https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
>
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
_______________________________________________
zeromq-dev mailing list
[email protected]
https://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to