On 15 Jan 2010, at 21:50, Anh Hai Trinh wrote:

Hello all,

I'd like to point out an alternative module with respect to
asynchronous computation: `stream` (which I wrote) supports
ThreadPool, ProcessPool and Executor with a simpler API and
implementation.

Neat!

I'm not sure that I'd agree with the simpler API part though :-)

My module takes a list-processing oriented view in which a
ThreadPool/ProcessPool is simply a way of working with each stream
element concurrently and output results possibly in out of order.

A trivial example is:

 from stream import map
 range(10) >> ThreadPool(map(lambda x: x*x)) >> sum
 # returns 285

I think that you are probably missing an import. The equivalent using futures would be:

from futures import ThreadPoolExecutor
sum(ThreadPoolExecutor.map(lambda x: x*x, range(10))



The URLs retrieving example is:

 import urllib2
 from stream import ThreadPool

 URLs = [
    'http://www.cnn.com/',
    'http://www.bbc.co.uk/',
    'http://www.economist.com/',
    'http://nonexistant.website.at.baddomain/',
    'http://slashdot.org/',
    'http://reddit.com/',
    'http://news.ycombinator.com/',
 ]

 def retrieve(urls, timeout=10):
    for url in urls:
       yield url, urllib2.urlopen(url, timeout=timeout).read()

 if __name__ == '__main__':
    retrieved = URLs >> ThreadPool(retrieve, poolsize=len(URLs))
    for url, content in retrieved:
       print '%r is %d bytes' % (url, len(content))
    for url, exception in retrieved.failure:
       print '%r failed: %s' % (url, exception)


Note that the main argument to ThreadPool is an iterator-processing
function: one that takes an iterator and returns an iterator. A
ThreadPool/Process simply distributes the input to workers running
such function and gathers their output as a single stream.

"retrieve" seems to take multiple url arguments. Does ThreadPool using some sort of balancing strategy if poolsize where set to < len(URLs)?

One important different between `stream` and `futures` is the order of
returned results.  The pool object itself is an iterable and the
returned iterator's `next()` calls unblocks as soon as there is an
output value.  The order of output is the order of job completion,
whereas for `futures.run_to_results()`, the order of the returned
iterator is based on the submitted FutureList --- this means if the
first item takes a long time to complete, subsequent processing of the
output can not benefit from other results already available.

Right, which is why futures has a as_completed() function. One difference is between the two implementations is that streamed remembers the arguments that it is processing while futures discards them when it doesn't need them. This was done for memory consumption reasons but the streamed approach seems to lead to simpler code.


The other difference is that there is no absolutely no abstraction but
two bare iterables for client code to deal with: one iterable over the
results, and one iterable over the failure; both are thread-safe.

If delicate job control is necessary, an Executor can be used. It is
implemented on top of the pool, and offers submit(*items) which
returns job ids to be used for cancel() and status().  Jobs can be
submitted and canceled concurrently.

What type is each "item" supposed to be?

Can I wait on several items? What if they are created by different executors?

Cheers,
Brian

The documentation is available at <http://www.trinhhaianh.com/stream.py >.

The code repository is located at <http://github.com/aht/stream.py>.
The implementation of ThreadPool, ProcessPool and Executor is little
more than 300 lines of code.


Peace,

--
// aht
http://blog.onideas.ws

_______________________________________________
stdlib-sig mailing list
stdlib-sig@python.org
http://mail.python.org/mailman/listinfo/stdlib-sig

Reply via email to