Re: [Txamqp-user] How do you listen to different queues asynchronously?
> "Esteve" == Esteve Fernandez writes: Esteve> there's probably something I'm missing here, but why are you Esteve> defining a function and attaching it to an errback if you're Esteve> already using inlineCallbacks? This looks cleaner to me: I wasn't understanding Ale's situation. We talked later on IRC and I told him to do as you suggest - just wrap in a try/except. T ___ Mailing list: https://launchpad.net/~txamqp-user Post to : txamqp-user@lists.launchpad.net Unsubscribe : https://launchpad.net/~txamqp-user More help : https://help.launchpad.net/ListHelp
Re: [Txamqp-user] How do you listen to different queues asynchronously?
Hi Esteve, 2009/11/17 Esteve Fernandez : > and then, when the user quits the application, put the STOP_TOKEN > object in the queue: > > queue.put(STOP_TOKEN) > > remember that when you call queue() on a channel instance, you get an > instance of DeferredQueue > (http://twistedmatrix.com/documents/8.2.0/api/twisted.internet.defer.DeferredQueue.html), > so you're free to put whatever you want in it without having to send > anything through the network :-) > That's a very good idea. I think it's much cleaner than raising an exception or the stuff with the deferreds. Thanks a lot Esteve. it's admirable your (Terry and you) efforts to help strangers on the internet :-) -- Ale. ___ Mailing list: https://launchpad.net/~txamqp-user Post to : txamqp-user@lists.launchpad.net Unsubscribe : https://launchpad.net/~txamqp-user More help : https://help.launchpad.net/ListHelp
Re: [Txamqp-user] How do you listen to different queues asynchronously?
Hi Terry On Mon, Nov 16, 2009 at 1:26 AM, Terry Jones wrote: > @inlineCallbacks > def consumer_fanout(conn, chan, queue, sig): > > done = False > > def _cancel(failure): > # failure.trap(SomeErrorType) > done = True > return chan.basic_cancel("testtag_fanout") > > while not done: > d = queue.get() > d.addErrback(_cancel) > msg = yield d > print '[FANOUT] Received: ' + msg.content.body + ... > > And you might want to use failure.trap in _cancel to make sure you have a > Closed error, supposing you care. there's probably something I'm missing here, but why are you defining a function and attaching it to an errback if you're already using inlineCallbacks? This looks cleaner to me: @inlineCallbacks def consumer_fanout(conn, chan, queue, sig): done = False while not done: try: msg = yield queue.get() print '[FANOUT] Received: ' + msg.content.body + ... except: # you may replace this with except SomeErrorType done = True yield chan.basic_cancel("testtag_fanout") Cheers. ___ Mailing list: https://launchpad.net/~txamqp-user Post to : txamqp-user@lists.launchpad.net Unsubscribe : https://launchpad.net/~txamqp-user More help : https://help.launchpad.net/ListHelp
Re: [Txamqp-user] How do you listen to different queues asynchronously?
Hi Ale On 11/16/09, Ale wrote: >> Yeah :-), I was thinking more of breaking the consumer loop because >> I'm quiting my program and I want to end cleanly. Maybe using a global >> boolean variable? Sending a STOP message from the consumer to >> terminate that same process doesn't look right. IMHO, a cleaner approach is to do: STOP_TOKEN = object() @inlineCallbacks def consumer_fanout(conn, chan, queue): while True: msg = yield queue.get() if msg is STOP_TOKEN: queue.close() break print '[FANOUT] Received: ' + msg.content.body + ' from channel #' + str(chan.id) yield chan.basic_cancel("testtag_fanout") and then, when the user quits the application, put the STOP_TOKEN object in the queue: queue.put(STOP_TOKEN) remember that when you call queue() on a channel instance, you get an instance of DeferredQueue (http://twistedmatrix.com/documents/8.2.0/api/twisted.internet.defer.DeferredQueue.html), so you're free to put whatever you want in it without having to send anything through the network :-) Cheers. ___ Mailing list: https://launchpad.net/~txamqp-user Post to : txamqp-user@lists.launchpad.net Unsubscribe : https://launchpad.net/~txamqp-user More help : https://help.launchpad.net/ListHelp
Re: [Txamqp-user] How do you listen to different queues asynchronously?
Hi Terry, This is getting interesting :-) and therefore I'm starting to think I'm missing something, or didn't fully understand the API. 2009/11/15 Terry Jones : > I'm not 100% sure I understand what you want or what your situation is, but > maybe something like this is what you want: while True: msg = yield queue.get() if msg.content.body == "STOP" break I could either end the ^ loop sending a stop message, or I could count how many messages where sent and top the loop. But my client just has to continue listening for messages until somebody presses a quit button. When pressing the quit button I could send the stop message but, at least for me, it seems silly that I have to send a message over the network to stop the client that's running on my machine. Futhermore I don't know how many messages I'm going to get. As far as I understand: msg = yield queue.get() won't return until I send a message. So the things I've thought are: * using queue.close() although I didn't understand how it worked... I did something like this while True: try: msg = yield queue.get() except Closed: break called queue.closed() from some other place * using what I sent before. * just stopping the reactor? (that's ugly) > > @inlineCallbacks > def consumer_fanout(conn, chan, queue, sig): > > done = False > > def _cancel(failure): > # failure.trap(SomeErrorType) > done = True > return chan.basic_cancel("testtag_fanout") > > while not done: > d = queue.get() > d.addErrback(_cancel) > msg = yield d > print '[FANOUT] Received: ' + msg.content.body + ... > As far as I understand this will end the loop when an error occurs. I want to break the loop some how without sending a message through the network to stop this consumer. Hope I explained myself. I'm going to try again with queue.close() see how that goes. Sorry for asking so many questions... Cheers, -- Ale. ___ Mailing list: https://launchpad.net/~txamqp-user Post to : txamqp-user@lists.launchpad.net Unsubscribe : https://launchpad.net/~txamqp-user More help : https://help.launchpad.net/ListHelp
Re: [Txamqp-user] How do you listen to different queues asynchronously?
> I was looking for a way to stop the: > > while True: >msg = yield queue.get() > > What should I do if the loop is just stuck in yield queue.get() and I > want to cancel it? > > My program is a GTK2 app that keeps listening to messages and I would > like to stop listening when I close the app. So I "invented" that. The > trouble with it is that queue.get() throws Closed so I have to add an > errback to trap it. I'm not 100% sure I understand what you want or what your situation is, but maybe something like this is what you want: @inlineCallbacks def consumer_fanout(conn, chan, queue, sig): done = False def _cancel(failure): # failure.trap(SomeErrorType) done = True return chan.basic_cancel("testtag_fanout") while not done: d = queue.get() d.addErrback(_cancel) msg = yield d print '[FANOUT] Received: ' + msg.content.body + ... And you might want to use failure.trap in _cancel to make sure you have a Closed error, supposing you care. Terry ___ Mailing list: https://launchpad.net/~txamqp-user Post to : txamqp-user@lists.launchpad.net Unsubscribe : https://launchpad.net/~txamqp-user More help : https://help.launchpad.net/ListHelp
Re: [Txamqp-user] How do you listen to different queues asynchronously?
2009/11/15 Terry Jones : Hi Terry, Thanks for your response > If you want a deferred that fires with the first of n other deferreds, you > can just do this: > > from twisted.internet import defer > > d = defer.DeferredList([def1, def2], fireOnOneCallback=True) I'll look into it and see if that helps. I was looking for a way to stop the: while True: msg = yield queue.get() What should I do if the loop is just stuck in yield queue.get() and I want to cancel it? My program is a GTK2 app that keeps listening to messages and I would like to stop listening when I close the app. So I "invented" that. The trouble with it is that queue.get() throws Closed so I have to add an errback to trap it. All try out your sugestion, thanks Terry. Cheers, -- Ale. ___ Mailing list: https://launchpad.net/~txamqp-user Post to : txamqp-user@lists.launchpad.net Unsubscribe : https://launchpad.net/~txamqp-user More help : https://help.launchpad.net/ListHelp
Re: [Txamqp-user] How do you listen to different queues asynchronously?
> "Ale" == Ale writes: Ale> This is the sort of thing I wanted to achieve but without trapping Ale> exceptions. I don't know how "dirty" this is (a snip, for the whole Ale> file see paste): Ale> def whichEverFiresFirst(def1, def2): Hi Ale If you want a deferred that fires with the first of n other deferreds, you can just do this: from twisted.internet import defer d = defer.DeferredList([def1, def2], fireOnOneCallback=True) and d will be called with a tuple containing the result of the deferred that fired first and an index indicating which deferred it was (an index into the list of deferreds you pass to DeferredList). Note that this deferred (i.e., the DeferredList) cannot errback in this case. You should take care of errors that could occur in your deferreds. It might help to closely read some of the code in twisted/internet/defer.py That helped me a lot. I guess there's no real substitute. Terry ___ Mailing list: https://launchpad.net/~txamqp-user Post to : txamqp-user@lists.launchpad.net Unsubscribe : https://launchpad.net/~txamqp-user More help : https://help.launchpad.net/ListHelp
Re: [Txamqp-user] How do you listen to different queues asynchronously?
> This is the sort of thing I wanted to achieve but without trapping > exceptions. I don't know how "dirty" this is (a snip, for the whole > file see paste): > Sorry the paste is here: http://dpaste.com/hold/120891/ Cheers, -- Ale. ___ Mailing list: https://launchpad.net/~txamqp-user Post to : txamqp-user@lists.launchpad.net Unsubscribe : https://launchpad.net/~txamqp-user More help : https://help.launchpad.net/ListHelp
Re: [Txamqp-user] How do you listen to different queues asynchronously?
>> It depends on how you want it to be terminated :-) You can either keep >> a counter or send a special message (like STOP). >> > > Yeah :-), I was thinking more of breaking the consumer loop because > I'm quiting my program and I want to end cleanly. Maybe using a global > boolean variable? Sending a STOP message from the consumer to > terminate that same process doesn't look right. > > The idea is that I have a bunch of users in a "conversation" (sending > a receiving messages) and any of the users can leave the > "conversation" any time without disturbing others. > This is the sort of thing I wanted to achieve but without trapping exceptions. I don't know how "dirty" this is (a snip, for the whole file see paste): def whichEverFiresFirst(def1, def2): d = Deferred() def _callback(obj, d): d.callback(obj) def _errback(failure): failure.trap(Closed) def1.addCallback(_callback, d) def1.addErrback(_errback) def2.addCallback(_callback, d) return d class StopSignal(object): def get_signal(self): self.d = Deferred() return self.d def fire_signal(self): self.d.callback("STOP") @inlineCallbacks def consumer_fanout(conn, chan, queue, sig): while True: msg = yield whichEverFiresFirst(queue.get(), sig.get_signal()) if isinstance(msg, str) and msg == "STOP": print "-> STOP SIGNAL <-" queue.close() break print '[FANOUT] Received: ' + msg.content.body + ' from channel #' + str(chan.id) yield chan.basic_cancel("testtag_fanout") Cheers, -- Ale. ___ Mailing list: https://launchpad.net/~txamqp-user Post to : txamqp-user@lists.launchpad.net Unsubscribe : https://launchpad.net/~txamqp-user More help : https://help.launchpad.net/ListHelp
Re: [Txamqp-user] How do you listen to different queues asynchronously?
Hi Esteve, First thanks a lot for replying my email. Comments in-line. 2009/11/9 Esteve Fernandez : [snip] >> The code which I sent both queues are on the same exchange. > > I modified the last snippet I sent, and added another set of > (exchange, queue, consumer), but for a fanout exchange in this case: > > http://dpaste.com/hold/118490/ >> Is this ok? Is there a better way? Should I use threads here? > > Whenever you feel the temptation of using threads, remember they are > only needed for blocking code :-) Most of the time you can achieve the > same rewriting your code using timed events. I did tried to use gatherResults in other attempts but I was confused. Help me understand: gatherResults waits for all the deferreds to fire (or in this case for all the functions in the list to return). But since the the functions are working on loops waiting for events gatherResults wont return until the producers send STOP and the consumers break the loop. Is that right? Am sorry if I ask this twisted question on the list but they are closely related. > >> Another question I've is what about terminating the connection: >> How should I end the connection without sending the STOP message? > > It depends on how you want it to be terminated :-) You can either keep > a counter or send a special message (like STOP). > Yeah :-), I was thinking more of breaking the consumer loop because I'm quiting my program and I want to end cleanly. Maybe using a global boolean variable? Sending a STOP message from the consumer to terminate that same process doesn't look right. The idea is that I have a bunch of users in a "conversation" (sending a receiving messages) and any of the users can leave the "conversation" any time without disturbing others. > Hope that helped. Yes indeed it helped a lot. Cheers, -- Ale. ___ Mailing list: https://launchpad.net/~txamqp-user Post to : txamqp-user@lists.launchpad.net Unsubscribe : https://launchpad.net/~txamqp-user More help : https://help.launchpad.net/ListHelp
Re: [Txamqp-user] How do you listen to different queues asynchronously?
Hi Ale On 11/9/09, Ale wrote: > I've been trying to get a consumer to listen to two channels using > LoopingCall: http://dpaste.com/hold/118305/. The code is modified from > the original examples. LoopingCall is meant for situations where you need to run something at periodic intervals, however things like peeking from a queue, are better handled by using events. The DeferredQueue provides an event-driven mechanism for using queues, a call to get will return a Deferred that will be fired when there's data in the queue (by calling put) > What I wanted to achieve was to listen to > different queues asynchronously (on one channel each queue). You don't need separate channels for using more than one queue. Queues are global to the broker (more specifically, to the virtual host in the broker), however consumers are not shared between channels or connections, though. > The code which I sent both queues are on the same exchange. I modified the last snippet I sent, and added another set of (exchange, queue, consumer), but for a fanout exchange in this case: http://dpaste.com/hold/118490/ > Is this ok? Is there a better way? Should I use threads here? Whenever you feel the temptation of using threads, remember they are only needed for blocking code :-) Most of the time you can achieve the same rewriting your code using timed events. > Another question I've is what about terminating the connection: > How should I end the connection without sending the STOP message? It depends on how you want it to be terminated :-) You can either keep a counter or send a special message (like STOP). Hope that helped. Cheers. ___ Mailing list: https://launchpad.net/~txamqp-user Post to : txamqp-user@lists.launchpad.net Unsubscribe : https://launchpad.net/~txamqp-user More help : https://help.launchpad.net/ListHelp