EL 2012/5/8 Daπid <davidmen...@gmail.com>: > Esto es lo que tengo ahora: > > http://pastebin.com/pa2dtNuN > > Y esto lo que obtengo: > > Started! > Standard map: > 1 , 2 > --- > 2 , 4 > --- > > Parallel map > ==Saving== > [1, 2] > Process Process-3: > Traceback (most recent call last): > File "C:\Python26\lib\multiprocessing\process.py", line 232, in _bootstrap > self.run() > File "C:\Python26\lib\multiprocessing\process.py", line 88, in run > self._target(*self._args, **self._kwargs) > File "F:\Mis > documentos\eclipse\Research\Networkx\src\Statistics\Mod_infty_II-b\Parallel\paralell_test.py", > line 35, in saving > savefile.write(item[0]) > ValueError: I/O operation on closed file > 2 , 4 > 1 , 2 > 1 > End! > > Ocurre lo siguiente: > > - El map normal funciona, y es capaz de poner objetos en la cola sin > problemas. > - Desde el map de multithreading los hilos se atascan al llegar al > q.put (no llegan a ponerlo en la cola), pero sin embargo, el pool.join > se desbloquea. > - El proceso de guardado no es capaz de acceder al fichero, al aparecer > cerrado. > > ¿Alguna idea?
Creo que tienes un problema de tiempos, que no dejas que terminen los procesos por sí sólos. Además de ésto, no te fíes del 'print' para comprobar los resultados de los procesos puesto que el módulo 'multiprocessing' cachea la salida y que sólo muestra cuando el buffer se llena o se invoca explícitamente el 'sys.stdout.flush()' El guión de lo que sucese podría ser el siguiente: LLenado del queue de datos: print 'Standard map:' map(calculate, pars) print Creación del pool: print 'Parallel map' pool = Pool(processes=min(ncpu, len(pars))) pool.map_async(calculate, pars, chunksize=1) pool.close() Invocación del proceso "saving": k=Process(target=saving, args=(q,savefile,)) k.start() Prácticamente se ejecuta al instante con los datos del queue guardados con anterioridad a la creación del pool. En el proceso "saving" se produce una excepción al quedar vacia la queue, imprimiendo un "#" (que no ves por el cacheo del stdout comentado) y queda en espera (time.sleep(0.5)). Ejecución de los procesos del pool: pool.join() Prácticamente se ejecuta al instante, como el caso del proceso "saving". Se llena el queue de nuevo, pero el proceso "saving" sigue a la espera durante un rato (time.sleep(0.5)) Finalización del queue: print q.qsize() q.close() q.join_thread() Aquí, el proceso principal espera hasta que el queue se vacíe del todo. Simultánemente al momento en que el proceso "saving" obtenga el último dato de la queue, el proceso principal ejecuta lo siguiente: k.terminate() pool.terminate() O sea, el proceso "saving" prácticamente se ve interrumpido en el mismo momento que extrae el último dato de la queue, que en el caso de windows se nota por un fallo general de las I/O. SOLUCIÓN: emplea JoinableQueue . En este tipo de queue hay que emparejar cada get() con un task_done(), un modo de señalizar que el procesado del dato ha acabado. def saving(q, savefile): print '==Saving==' while True: item=q.get() print item print >>savefile, item[0],',',item[1],'\r\n' q.task_done() if __name__=="__main__": print 'Started!' q=JoinableQueue() .... q.close() q.join() -- Hyperreals *R: http://ch3m4.org/blog Quarks, bits y otras criaturas infinitesimales _______________________________________________ Python-es mailing list Python-es@python.org http://mail.python.org/mailman/listinfo/python-es FAQ: http://python-es-faq.wikidot.com/