Re: multiprocessing in a while loop?

2014-06-27 Thread Jesse Adam

Could you post 
a) what the output looks like now (sans the logging part)
b) what output do you expect


In any event, this routine does not look right to me:

def consume_queue(queue_name):
  conn = boto.connect_sqs()
  q = conn.get_queue(queue_name)
  m = q.read()
  while m is not None:
yield m
q.delete_message(m)
logger.debug('message deleted')
m = q.read()
-- 
https://mail.python.org/mailman/listinfo/python-list


multiprocessing in a while loop?

2014-05-06 Thread Johan Llewellyn
hi, I am struggling to understand how to leverage python's multiprocessing 
module in a while loop.  the examples I have found seem to assume it is known 
ahead of time how many items need to be processed.

specifically, I am reading from an external queue.  I currently process items 
one at a time until the queue is empty.  I wrote a wrapper function to handle 
connecting to the queue, pulling the next message, and deleting it when I am 
done.  ideally, I'd like to modify this wrapper function to take an additional 
argument (processes) to specify the number of messages to process 
simultaneously.

I've attached a script that captures what I am doing now.  unfortunately, the 
external queue object is not publicly accessible and I'm not quite sure how to 
set up a local object that would support testing.  any suggestions would be 
most welcome.


thanks,
Johan
#!/usr/bin/env python

import boto
import logging

LOG_FMT = '%(asctime)s - %(levelname)s - %(module)s.%(funcName)s - %(message)s'
QUEUE = 'my-queue'

logger = logging.getLogger(__name__)


# read from an external queue; items may be added during processing
def consume_queue(queue_name):
  conn = boto.connect_sqs()
  q = conn.get_queue(queue_name)
  m = q.read()
  while m is not None:
yield m
q.delete_message(m)
logger.debug('message deleted')
m = q.read()


# high variability in message processing times (seconds - hours)
def handle_message(message)
  s = message.get_body()
  logger.info(s)


def main():
  for message in consume_queue(QUEUE):
handle_message(message)


if __name__ == '__main__':
  logging.basicConfig(level=logging.DEBUG, format=LOG_FMT)
  main()

-- 
https://mail.python.org/mailman/listinfo/python-list