On Thu, 29 Jun 2006, Tino Dai wrote: > Between the producer and consumer threads, does the consumer end of the > queue sit there and wait for something to come down the queue...
Yes. The following call: workunit = self.inQ.get(True) means, try to take something off the queue, and if the queue is empty, wait until something is put onto the queue by another thread. That forced waiting ("blocking") is the behavior that I think you want. Another ("non-blocking") form (that I didn't use) could have been: workunit = self.inQ.get() But this one does *not* wait if the queue is empty. It either returns immediately, if there's something in the queue to get; or it raises the Empty exception if there isn't. If you use the non-blocking form, I think you're back to requiring your semaphores. The blocking form is particularly convenient for the type of application you're talking about, I think. (I've never actually had occasion to use the non-blocking form, but then, my needs are usually pretty simple). There's also a temporarily blocking form, e.g.: workunit = self.inQ.get(True, 10) This tries to get something from the queue, and either returns something, or, if nothing's on the queue, waits for up to 10 (in this example) seconds. If something shows up on that time, it wakes up and returns the element off the queue, or raises Empty if the queue is still empty. For our purposes, this has the same problems as the non-blocking form, and I'd avoid it. By the way, do not try to use a combination of the non-blocking form and Queue.empty(), to determine if something's there to get, and get only if it's non-empty. Queue.empty() is not reliable for synchronization purposes. I would consider using Queue.empty() only if, for example, I had some sort of monitoring thread that was periodically checking if there was anything in the queue, solely for the purpose of providing tuning data. (i.e., 90% of the time, a queue is non-empty, so maybe I should consider having more consumer threads on that queue) Even there, I'd probably use qsize() instead (which is also unreliable for synch purposes). You're best of just pretending that Queue.empty(), Queue.full() and Queue.qsize() don't exist, for synchronization purposes. > or is the consumer wake up after a randompause()? The randompause() is *only* to insert random pauses into the execution, so the threads have a little bit of a chance of executing out of order, in essence simulating the threads' work taking some amount of unpredictable time. In practice, it doesn't usually have any effect. You wouldn't use the randompause() in your code, that's just for simulation purposes. > Right now, I have the semaphores as gatekeepers to each one of the > threads. And until something is in the queue, the thread's semaphore > will wait for the semphore to be released by the previous thread. Right. That's an unnecessary complication, requiring that you manage the inter-thread communication. If you instead use blocking queue reads, you won't have to do that. The blocking will make sure that each thread wakes up and does work only when there's work for it. By the way, I'm not a professional programmer (I'm a lawyer, who programs pretty much as a hobby), and it shows in this code. My first cut had some errors because I really don't get namespaces very well. I adjusted it to get it to work, but not very well; very kludgily. Here's a more appropriate version (not that it's a paragon of good code now, but it's not as, um, eccentric, as last night's). I've also changed it to get rid of the id variable being carried in the thread instance. Instead, I use setName to name a thread, and getName to access that name. This is better, because then a meaningful thread name will show up in any exception messages you get (e.g., "Exception in thread first:") import time, Queue, threading, random class processor(threading.Thread): def __init__(self, id, inQ=None, outQ=None, inmessage=None): """ id is thread Id: "first" for initial producer, "last" for final consumer inQ is input Q, or None for first producer outQ is outputQ, or None for final consumer """ self.inQ = inQ self.outQ = outQ if id == "first": self.l = list(inmessage) if id == "last": self.message="" threading.Thread.__init__(self) self.setName(id) def run(self): threadname = self.getName() if threadname == "first": self.producer() elif threadname == "last": self.consumer() else: self.hybrid() def producer(self): while True: self.randompause() try: workunit = self.l.pop(0) except IndexError: self.outQ.put("stop") self.endmessage() return self.statusmessage(workunit) self.outQ.put(workunit) def consumer(self): while True: self.randompause() workunit = self.inQ.get(True) if workunit == "stop": print "final message:", self.message self.endmessage() return else: self.statusmessage(workunit) self.message = self.message+workunit def hybrid(self): while True: self.randompause() workunit = self.inQ.get(True) if workunit == "stop": self.outQ.put(workunit) self.endmessage() return else: self.statusmessage(workunit) self.outQ.put(workunit) def randompause(self): time.sleep(random.randint(1,5)) def statusmessage(self, workunit): print "%s thread %s, workunit %s" %(time.asctime(), self.getName(), workunit) def endmessage(self): print "%s thread %s ending" %(time.asctime(), self.getName()) if __name__ == "__main__": q_ab = Queue.Queue() pa = processor(id="first", inmessage="spam", outQ=q_ab) q_bc = Queue.Queue() pb = processor(id="second", inQ=q_ab, outQ=q_bc) q_cd = Queue.Queue() pc = processor(id="third", inQ=q_bc, outQ=q_cd) q_de = Queue.Queue() pd = processor(id="fourth", inQ=q_cd, outQ=q_de) pe = processor(id="last", inQ=q_de) pa.start() pb.start() pc.start() pd.start() pe.start() _______________________________________________ Tutor maillist - Tutor@python.org http://mail.python.org/mailman/listinfo/tutor