Re: problem with multiprocessing and defaultdict

2010-01-12 Thread wiso
Robert Kern wrote:

 On 2010-01-11 17:50 PM, wiso wrote:
 
 The problem now is this:
 start reading file r1_200909.log
 start reading file r1_200910.log
 readen 488832 lines from file r1_200910.log
 readen 517247 lines from file r1_200909.log

 with huge file (the real case) the program freeze. Is there a solution to
 avoid pickling/serialization, ... for example something like this:

 if __name__ == __main__:
  file_names = [r1_200909.log, r1_200910.log]
  pool = multiprocessing.Pool(len(file_names))
  childrens = [Container(f) for f in file_names]
  pool.map(lambda c: c.read(), childrens)

 PicklingError: Can't pickletype 'function': attribute lookup
 __builtin__.function failed
 
 You can't pickle lambda functions.
 
 What information do you actually need back from the workers?
 

They sent back the object filled with data. The problem is very simple: I 
have a container, the container has a method read(file_name) that read a 
huge file and fill the container with datas. I have more then 1 file to read 
so I want to parallelize this process. The reading method is quite slow 
because it involves regex.
-- 
http://mail.python.org/mailman/listinfo/python-list


Re: problem with multiprocessing and defaultdict

2010-01-12 Thread Wolodja Wentland
On Tue, Jan 12, 2010 at 11:48 +0100, wiso wrote:
 They sent back the object filled with data. The problem is very simple: I 
 have a container, the container has a method read(file_name) that read a 
 huge file and fill the container with datas. I have more then 1 file to read 
 so I want to parallelize this process. The reading method is quite slow 
 because it involves regex.

Take a look at multiprocessing.Manager and use one to proxy access to a
*shared* container to your container from all processes.

If your container is a dict it is as easy as:

manager = multiprocessing.Manager()
managed_dict = manager.dict()
...

-- 
  .''`. Wolodja Wentlandwentl...@cl.uni-heidelberg.de 
 : :'  :
 `. `'` 4096R/CAF14EFC 
   `-   081C B7CD FF04 2BA9 94EA  36B2 8B7F 7D30 CAF1 4EFC


signature.asc
Description: Digital signature
-- 
http://mail.python.org/mailman/listinfo/python-list


problem with multiprocessing and defaultdict

2010-01-11 Thread wiso
I'm using a class to read some data from files:

import multiprocessing
from collections import defaultdict

def SingleContainer():
return list()


class Container(defaultdict):

this class store odd line in self[odd] and even line in self[even].
It is stupid, but it's only an example.

def __init__(self,file_name):
if type(file_name) != str:
raise AttributeError, %s is not a string % file_name
defaultdict.__init__(self,SingleContainer)
self.file_name = file_name
self.readen_lines = 0
def read(self):
f = open(self.file_name)
print start reading file %s % self.file_name
for line in f:
self.readen_lines += 1
values = line.split()
key = {0: even, 1: odd}[self.readen_lines %2]
self[key].append(values)
print readen %d lines from file %s % (self.readen_lines, 
self.file_name)


Now I want to read more than one file per times


def do(file_name):
container = Container(file_name)
container.read()
return container

if __name__ == __main__:
file_names = [prova_200909.log, prova_200910.log]
pool = multiprocessing.Pool(len(file_names))
result = pool.map(do,file_names)
pool.close()
pool.join()
print Finish



but I got:
start reading file prova_200909.log
start reading file prova_200910.log
readen 142 lines from file prova_200909.log
readen 160 lines from file prova_200910.log
Exception in thread Thread-2:
Traceback (most recent call last):
  File /usr/lib64/python2.6/threading.py, line 522, in __bootstrap_inner
self.run()
  File /usr/lib64/python2.6/threading.py, line 477, in run
self.__target(*self.__args, **self.__kwargs)
  File /usr/lib64/python2.6/multiprocessing/pool.py, line 259, in 
_handle_results
task = get()
  File main2.py, line 11, in __init__
raise AttributeError, %s is not a string % file_name
AttributeError: (AttributeError('function SingleContainer at 
0x7f08b253d938 is not a string',), class '__main__.Container', (function 
SingleContainer at 0x7f08b253d938,))


the problem is when pool share objects, but why is it calling 
Container.__init__ with a Container parameter?
-- 
http://mail.python.org/mailman/listinfo/python-list


Re: problem with multiprocessing and defaultdict

2010-01-11 Thread Robert Kern

On 2010-01-11 17:15 PM, wiso wrote:

I'm using a class to read some data from files:

import multiprocessing
from collections import defaultdict

def SingleContainer():
 return list()


class Container(defaultdict):
 
 this class store odd line in self[odd] and even line in self[even].
 It is stupid, but it's only an example.
 
 def __init__(self,file_name):
 if type(file_name) != str:
 raise AttributeError, %s is not a string % file_name
 defaultdict.__init__(self,SingleContainer)
 self.file_name = file_name
 self.readen_lines = 0
 def read(self):
 f = open(self.file_name)
 print start reading file %s % self.file_name
 for line in f:
 self.readen_lines += 1
 values = line.split()
 key = {0: even, 1: odd}[self.readen_lines %2]
 self[key].append(values)
 print readen %d lines from file %s % (self.readen_lines,
self.file_name)


Now I want to read more than one file per times


def do(file_name):
 container = Container(file_name)
 container.read()
 return container

if __name__ == __main__:
 file_names = [prova_200909.log, prova_200910.log]
 pool = multiprocessing.Pool(len(file_names))
 result = pool.map(do,file_names)
 pool.close()
 pool.join()
 print Finish



but I got:
start reading file prova_200909.log
start reading file prova_200910.log
readen 142 lines from file prova_200909.log
readen 160 lines from file prova_200910.log
Exception in thread Thread-2:
Traceback (most recent call last):
   File /usr/lib64/python2.6/threading.py, line 522, in __bootstrap_inner
 self.run()
   File /usr/lib64/python2.6/threading.py, line 477, in run
 self.__target(*self.__args, **self.__kwargs)
   File /usr/lib64/python2.6/multiprocessing/pool.py, line 259, in
_handle_results
 task = get()
   File main2.py, line 11, in __init__
 raise AttributeError, %s is not a string % file_name
AttributeError: (AttributeError('function SingleContainer at
0x7f08b253d938  is not a string',),class '__main__.Container', (function
SingleContainer at 0x7f08b253d938,))


the problem is when pool share objects, but why is it calling
Container.__init__ with a Container parameter?


When you return the container from do() in the worker process, it must be 
pickled in order to be sent over the socket. You did not override the 
implementation of the .__reduce_ex__() method, so it used defaultdict's version 
which passes the factory function as the first argument to the constructor.


I would recommend passing back the container.items() list instead of a Container 
instance as the easiest path forward.


--
Robert Kern

I have come to believe that the whole world is an enigma, a harmless enigma
 that is made terrible by our own mad attempt to interpret it as though it had
 an underlying truth.
  -- Umberto Eco

--
http://mail.python.org/mailman/listinfo/python-list


Re: problem with multiprocessing and defaultdict

2010-01-11 Thread wiso
Robert Kern wrote:

 On 2010-01-11 17:15 PM, wiso wrote:
 I'm using a class to read some data from files:

 import multiprocessing
 from collections import defaultdict

 def SingleContainer():
  return list()


 class Container(defaultdict):
  
  this class store odd line in self[odd] and even line in
  self[even]. It is stupid, but it's only an example.
  
  def __init__(self,file_name):
  if type(file_name) != str:
  raise AttributeError, %s is not a string % file_name
  defaultdict.__init__(self,SingleContainer)
  self.file_name = file_name
  self.readen_lines = 0
  def read(self):
  f = open(self.file_name)
  print start reading file %s % self.file_name
  for line in f:
  self.readen_lines += 1
  values = line.split()
  key = {0: even, 1: odd}[self.readen_lines %2]
  self[key].append(values)
  print readen %d lines from file %s % (self.readen_lines,
 self.file_name)

 
 Now I want to read more than one file per times
 

 def do(file_name):
  container = Container(file_name)
  container.read()
  return container

 if __name__ == __main__:
  file_names = [prova_200909.log, prova_200910.log]
  pool = multiprocessing.Pool(len(file_names))
  result = pool.map(do,file_names)
  pool.close()
  pool.join()
  print Finish



 but I got:
 start reading file prova_200909.log
 start reading file prova_200910.log
 readen 142 lines from file prova_200909.log
 readen 160 lines from file prova_200910.log
 Exception in thread Thread-2:
 Traceback (most recent call last):
File /usr/lib64/python2.6/threading.py, line 522, in
__bootstrap_inner
  self.run()
File /usr/lib64/python2.6/threading.py, line 477, in run
  self.__target(*self.__args, **self.__kwargs)
File /usr/lib64/python2.6/multiprocessing/pool.py, line 259, in
 _handle_results
  task = get()
File main2.py, line 11, in __init__
  raise AttributeError, %s is not a string % file_name
 AttributeError: (AttributeError('function SingleContainer at
 0x7f08b253d938  is not a string',),class '__main__.Container',
 (function SingleContainer at 0x7f08b253d938,))


 the problem is when pool share objects, but why is it calling
 Container.__init__ with a Container parameter?
 
 When you return the container from do() in the worker process, it must be
 pickled in order to be sent over the socket. You did not override the
 implementation of the .__reduce_ex__() method, so it used defaultdict's
 version which passes the factory function as the first argument to the
 constructor.
 
 I would recommend passing back the container.items() list instead of a
 Container instance as the easiest path forward.
 

Thank you very much, I change from
 return container
to
 return container.items()
and it works:

start reading file prova_200909.log
readen 142 lines from file prova_200909.log
start reading file prova_200910.log
readen 160 lines from file prova_200910.log
Finish

The problem now is this:
start reading file r1_200909.log
start reading file r1_200910.log
readen 488832 lines from file r1_200910.log
readen 517247 lines from file r1_200909.log

with huge file (the real case) the program freeze. Is there a solution to 
avoid pickling/serialization, ... for example something like this:

if __name__ == __main__:
file_names = [r1_200909.log, r1_200910.log]
pool = multiprocessing.Pool(len(file_names))
childrens = [Container(f) for f in file_names]
pool.map(lambda c: c.read(), childrens)

PicklingError: Can't pickle type 'function': attribute lookup 
__builtin__.function failed

-- 
http://mail.python.org/mailman/listinfo/python-list


Re: problem with multiprocessing and defaultdict

2010-01-11 Thread Robert Kern

On 2010-01-11 17:50 PM, wiso wrote:


The problem now is this:
start reading file r1_200909.log
start reading file r1_200910.log
readen 488832 lines from file r1_200910.log
readen 517247 lines from file r1_200909.log

with huge file (the real case) the program freeze. Is there a solution to
avoid pickling/serialization, ... for example something like this:

if __name__ == __main__:
 file_names = [r1_200909.log, r1_200910.log]
 pool = multiprocessing.Pool(len(file_names))
 childrens = [Container(f) for f in file_names]
 pool.map(lambda c: c.read(), childrens)

PicklingError: Can't pickletype 'function': attribute lookup
__builtin__.function failed


You can't pickle lambda functions.

What information do you actually need back from the workers?

--
Robert Kern

I have come to believe that the whole world is an enigma, a harmless enigma
 that is made terrible by our own mad attempt to interpret it as though it had
 an underlying truth.
  -- Umberto Eco

--
http://mail.python.org/mailman/listinfo/python-list