Kevin:

I struggled with some of these issues for a while, and documented them here: 
https://github.com/zeromq/libzmq/issues/2759.  There’s an accompanying repo 
with sample code here: 
https://github.com/WallStProg/zmqtests/tree/master/threads.

The short version is that there are some pretty “sharp” edge-cases that you 
need to be aware of if you’re trying to go beyond the sometimes overly 
simplistic or contrived examples in the docs.

Good luck, and feel free to reply back if you still have questions.

Regards,

Bill


> On Dec 19, 2017, at 6:04 PM, Mykola Ostrovskyy via zeromq-dev 
> <[email protected]> wrote:
> 
> Kevin,
> 
> By now you should get the gist of it: if you have some concurrency problem -- 
> add more sockets! :)
> 
> Not sure what exactly you have run into, but I would not recommend to have a 
> hard stop on a thread that is sleeping in poll. Instead you should to let it 
> know that it's time to shutdown, so it can release resources in a controlled 
> fashion. And luckily we've got a great tool for sending notifications to 
> threads -- sockets. It might be not the most efficient way, but I have an 
> extra inproc PAIR socket bound on every thread. That socket is also added 
> into poll list, and when you receive a message on that socket you exit the 
> polling loop, close current thread sockets, do any cleanup necessary and 
> return from thread function.
> 
> From outside you'll have some stop() function that creates a corresponding 
> PAIR socket, connects to the one bound in the target thread, sends the 
> shutdown message, closes the socket and joins on the thread, waiting for it 
> to finish cleanup. In your case that stop() function should be sending 
> shutdown message to both P and C, of course. You can either go with two 
> sockets, or get creative and reuse a single PAIR socket to send both messages 
> (connect to P, send, disconnect, connect to C, etc). And then you will have 
> to wait for both threads to join.
> 
> 
> Regards,
> Mykola
> 
> 2017-12-19 23:27 GMT+02:00 Kevin Wang <[email protected] 
> <mailto:[email protected]>>:
> Thanks Mykola, I like your proposed solution, it works great! It does cut out 
> all the shared variables and deals with it better than using atomic ints. I 
> have placed the SUB socket in a zactor/zloop combination, and am using the 
> built in PAIR sockets instead of REQ/REP to communicate the requests and pass 
> the data from the queue to the user. 
> However, I am having a problem with destroying this class. There is some race 
> condition that I can't find between destroying the sockets and the user in 
> the middle of a zmq_poll() call. 
> What would be the order of destroying the objects? 
> 
> Kevin
> 
> On 12/18/2017 11:25 PM, Mykola Ostrovskyy via zeromq-dev wrote:
>> Hello Kevin,
>> 
>> One of main ZMQ features is providing an alternative approach for dealing 
>> with concurrency. If you use ZMQ properly, there should be no data sharing 
>> between threads, so you don't need to use any lock-free thread-safe data 
>> structures.
>> 
>> For your particular task, consider using two threads: provider (P) and 
>> Consumer (C). The threads communicate over REQ-REP and P has additional SUB 
>> socket for reception of broadcast messages. Something like this:
>> 
>> --> SUB --> P --> REP <--> REQ <--> C
>> 
>> At the startup P subscribes for PUB messages and starts to poll on both 
>> sockets. C sends to P request for a message and polls for reply. P receives 
>> request from C, but since there are no messages to process it sets a flag 
>> indicating that C is waiting and goes back to polling. Both threads are 
>> sleeping at this point, no CPU consumption.
>> 
>> Upon receiving message on SUB P adds it to a queue of incoming messages 
>> using any logic you deem necessary. After that it checks if C is waiting, 
>> and forwards head of the queue to C. C handles the message and sends next 
>> request to P. If the queue is not empty, P forwards head of the queue to C, 
>> otherwise stets the flag.
>> 
>> This way only P accesses the queue, so it does not have to be thread safe. C 
>> can see only one message at a time, and is either handling that message or 
>> is sleeping waiting for P reply.
>> 
>> If incoming traffic is beyond what C can handle, P will know that C is still 
>> busy (flag is not set) and can drop some messages form the queue.
>> 
>> Hope this helps.
>> 
>> 
>> Regards,
>> Mykola
>> 
>> 
>> 2017-12-19 2:02 GMT+02:00 Kevin Wang <[email protected] 
>> <mailto:[email protected]>>:
>> Hello everyone,
>> 
>> I have run into a design problem when making a subscriber class with a ring 
>> buffer using CZMQ.
>> 
>> CONTEXT:
>> 
>> The class contains a zactor, which then contains a zloop. The zloop has a 
>> reader for the sub socket, and on receiving a message, adds the message to a 
>> ring buffer, using a lock-free mechanism. This moves the "head" atomic int.
>> 
>> (The built in queue for zmq does not work for me. I need the oldest messages 
>> to be tossed when the queue is full but ZMQ tosses all newest messages)
>> 
>> The user has the ability to consume data from the buffer, which moves the 
>> "tail" atomic int.
>> 
>> PROBLEM:
>> 
>> I need a function that returns the number of available messages in the ring 
>> buffer, but with a timeout. Ideally, the user has a while loop that keeps 
>> checking the number of available messages, but with a timeout so it does not 
>> burn CPU usage.
>> 
>> I have tried 2 things:
>> 
>> 1. zmq_poll and its timeout functionality: it seems that when I call 
>> zmq_poll, it interferes with the zloop's polling and doesn't allow it to 
>> process the messages properly, so i can't poll on the sub socket on the 
>> user's thread.
>> 
>> 2. timers in zloop: I have tried to create a zloop timer when the user asks 
>> for available_messages(), which will either timeout and return 0 or cancel 
>> when a message comes in first and return # of messages. The problem is that 
>> at 2048byte message at 30,000/sec, adding a timer slows down things 
>> considerably.
>> 
>> 
>> I am considering using some kind of proxy, and the capture socket, but other 
>> than that, am out of ideas.
>> 
>> Any thoughts? Having a completely blocking function is not possible, and 
>> having it return immediately is not either.
>> 
>> Thanks to anyone that can help,
>> 
>> Kevin
>> 
>> _______________________________________________
>> zeromq-dev mailing list
>> [email protected] <mailto:[email protected]>
>> https://lists.zeromq.org/mailman/listinfo/zeromq-dev 
>> <https://lists.zeromq.org/mailman/listinfo/zeromq-dev>
>> 
>> 
>> 
>> _______________________________________________
>> zeromq-dev mailing list
>> [email protected] <mailto:[email protected]>
>> https://lists.zeromq.org/mailman/listinfo/zeromq-dev 
>> <https://lists.zeromq.org/mailman/listinfo/zeromq-dev>
> 
> 
> _______________________________________________
> zeromq-dev mailing list
> [email protected] <mailto:[email protected]>
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev 
> <https://lists.zeromq.org/mailman/listinfo/zeromq-dev>
> 
> 
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev

_______________________________________________
zeromq-dev mailing list
[email protected]
https://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to