Hi again, Greg asked why I used the concurrent.futures module rather than the multiprocessing module which is standard with Python 2.6.
There are a few differences in the API which makes the futures module more interesting. First off, here's how you could write the same process pool part using the existing multiprocessing module: from multiprocessing import Pool p = Pool(5) for mol, ids in p.map(generateconformations, zip(suppl, [n]*len(suppl))): for id in ids: writer.write(mol, confId=id) I have to use the "zip" because map(f, iterable, [chunksize=None]) only takes a single iterable. This also means I need to change the "generateconformations" function so that it takes a single element as input, which a 2-element tuple of the molecule and the count. (That is, change from def generateconformations(m, n): ... to def generateconformations((m, n)): ... ). That's a touch uglier, but doable. Now, when I posted the code yesterday, I should have posted the simplest version of the code, which is: with futures.ProcessPoolExecutor(max_workers=max_workers) as executor: for mol, ids in executor.map(generateconformations, suppl, [n]*len(suppl)): for id in ids: writer.write(mol, confId=id) Then Greg wouldn't have asked me about how complex my code was. ;) This is the easiest to understand. You can see that this API supports multiple iterators. I used [n]*len(suppl) to make a new list containing repeats of the count, so I could have the twin iterators of the molecules and the count. This is a bit simpler than the multiprocessing code. In addition, the "with" statement know how to work with an executor. Here it means that all submitted jobs must finish before leaving the with block, and the process pool will be shut down; even if there's an exception. With the multiprocessing module, you need to manage that yourself, or trust in the memory manager. But I yesterday wrote something more like this: # Submit a set of asynchronous jobs jobs = [] for mol in suppl: if mol: job = executor.submit(generateconformations, mol, n) jobs.append(job) # Process the job results (in submission order) and save the conformers. for job in jobs: mol, ids = job.result() for id in ids: writer.write(mol, confId=id) The "submit" immediately returns a 'future' object, which is called a "promise" in some other language. You can ask for its .result() to get its result. That call will block (up to a timeout) if the result isn't there. You can also check to see if there is a result. The reason I did this is because I usually 1) show a progress bar and 2) have enough memory to store all the results in memory. I've enjoyed using the 'progressbar' module, from http://pypi.python.org/pypi/progressbar/ I have code which looks like this: with futures.ProcessPoolExecutor(max_workers=4) as executor: for (collection, first_id, last_id) in blocks: jobs.append(executor.submit(process_block, tmpdir, config, collection, first_id, last_id)) widgets = ["Fingerprinting ", progressbar.Percentage(), " ", progressbar.ETA(), " ", progressbar.Bar()] pbar = progressbar.ProgressBar(widgets=widgets, maxval=len(jobs)) for job in pbar(futures.as_completed(jobs)): job.result() This submits all of the fingerprinting jobs to the process pool. The "futures.as_completed()" function takes an iterable of jobs and returns each one as they become available, no matter what the order is. Then the ProgressBar sees the new item, updates the terminal display to show progress information and an ETA, only to return the original object itself as an iterator. Finally, I call job.result() in the loop, since .result() will forward any exceptions if one had happened during the original call. Then if I want the results I iterate over them again: for job in jobs: ... do something with job.result() ... BTW, you don't need to keep things around in memory. You can also do things purely asynchronously, should the output order not memory. In that case, the easiest thing to do is to use a callback function, like this: def save_conformers(job): mol, ids = job.result() for id in ids: writer.write(mol, confId=id) with futures.ProcessPoolExecutor(max_workers=max_workers) as executor: # Submit a set of asynchronous jobs for mol in suppl: if mol: job = executor.submit(generateconformations, mol, n) job.add_done_callback(save_conformers) Callback functions tend to be harder for most people to conceptualize. What this does is tell the submitted 'job' to call the function "save_conformers" when each job is complete. The save_conformers function will be called with the job object as its only parameter, and the function can itself call .result() to get the result and do something with it. The above might be useful if there are some conformers which take 10 minutes to generate, while most others take 5 seconds. In that case, you start getting output from the other processes even though one of them is stuck for a long time working on a process. Far more than you ever wanted to know on this topic. ;) Cheers, Andrew da...@dalkescientific.com ------------------------------------------------------------------------------ Don't let slow site performance ruin your business. Deploy New Relic APM Deploy New Relic app performance management and know exactly what is happening inside your Ruby, Python, PHP, Java, and .NET app Try New Relic at no cost today and get our sweet Data Nerd shirt too! http://p.sf.net/sfu/newrelic-dev2dev _______________________________________________ Rdkit-discuss mailing list Rdkit-discuss@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/rdkit-discuss