Re: problem with multiprocessing and defaultdict

2010-01-12 Thread wiso
Robert Kern wrote:

> On 2010-01-11 17:50 PM, wiso wrote:
>> The problem now is this:
>> start reading file r1_200909.log
>> start reading file r1_200910.log
>> readen 488832 lines from file r1_200910.log
>> readen 517247 lines from file r1_200909.log
>> with huge file (the real case) the program freeze. Is there a solution to
>> avoid pickling/serialization, ... for example something like this:
>> if __name__ == "__main__":
>>  file_names = ["r1_200909.log", "r1_200910.log"]
>>  pool = multiprocessing.Pool(len(file_names))
>>  childrens = [Container(f) for f in file_names]
>> c:, childrens)
>> PicklingError: Can't pickle: attribute lookup
>> __builtin__.function failed
> You can't pickle lambda functions.
> What information do you actually need back from the workers?

They sent back the object filled with data. The problem is very simple: I 
have a container, the container has a method read(file_name) that read a 
huge file and fill the container with datas. I have more then 1 file to read 
so I want to parallelize this process. The reading method is quite slow 
because it involves regex.

Re: problem with multiprocessing and defaultdict

2010-01-11 Thread wiso
Robert Kern wrote:

> On 2010-01-11 17:15 PM, wiso wrote:
>> I'm using a class to read some data from files:
>> import multiprocessing
>> from collections import defaultdict
>> def SingleContainer():
>>  return list()
>> class Container(defaultdict):
>>  """
>>  this class store odd line in self["odd"] and even line in
>>  self["even"]. It is stupid, but it's only an example.
>>  """
>>  def __init__(self,file_name):
>>  if type(file_name) != str:
>>  raise AttributeError, "%s is not a string" % file_name
>>  defaultdict.__init__(self,SingleContainer)
>>  self.file_name = file_name
>>  self.readen_lines = 0
>>  def read(self):
>>  f = open(self.file_name)
>>  print "start reading file %s" % self.file_name
>>  for line in f:
>>  self.readen_lines += 1
>>  values = line.split()
>>  key = {0: "even", 1: "odd"}[self.readen_lines %2]
>>  self[key].append(values)
>>  print "readen %d lines from file %s" % (self.readen_lines,
>> self.file_name)
>> """
>> Now I want to read more than one file per times
>> """
>> def do(file_name):
>>  container = Container(file_name)
>>  return container
>> if __name__ == "__main__":
>>  file_names = ["prova_200909.log", "prova_200910.log"]
>>  pool = multiprocessing.Pool(len(file_names))
>>  result =,file_names)
>>  pool.close()
>>  pool.join()
>>  print "Finish"
>> but I got:
>> start reading file prova_200909.log
>> start reading file prova_200910.log
>> readen 142 lines from file prova_200909.log
>> readen 160 lines from file prova_200910.log
>> Exception in thread Thread-2:
>> Traceback (most recent call last):
>>File "/usr/lib64/python2.6/", line 522, in
>>File "/usr/lib64/python2.6/", line 477, in run
>>  self.__target(*self.__args, **self.__kwargs)
>>File "/usr/lib64/python2.6/multiprocessing/", line 259, in
>> _handle_results
>>  task = get()
>>File "", line 11, in __init__
>>  raise AttributeError, "%s is not a string" % file_name
>> AttributeError: (AttributeError('> 0x7f08b253d938>  is not a string',),,
>> (,))
>> the problem is when pool share objects, but why is it calling
>> Container.__init__ with a Container parameter?
> When you return the container from do() in the worker process, it must be
> pickled in order to be sent over the socket. You did not override the
> implementation of the .__reduce_ex__() method, so it used defaultdict's
> version which passes the factory function as the first argument to the
> constructor.
> I would recommend passing back the container.items() list instead of a
> Container instance as the easiest path forward.

Thank you very much, I change from
 return container
 return container.items()
and it works:

start reading file prova_200909.log
readen 142 lines from file prova_200909.log
start reading file prova_200910.log
readen 160 lines from file prova_200910.log

The problem now is this:
start reading file r1_200909.log
start reading file r1_200910.log
readen 488832 lines from file r1_200910.log
readen 517247 lines from file r1_200909.log

with huge file (the real case) the program freeze. Is there a solution to 
avoid pickling/serialization, ... for example something like this:

if __name__ == "__main__":
file_names = ["r1_200909.log", "r1_200910.log"]
pool = multiprocessing.Pool(len(file_names))
childrens = [Container(f) for f in file_names] c:, childrens)

PicklingError: Can't pickle : attribute lookup 
__builtin__.function failed


problem with multiprocessing and defaultdict

2010-01-11 Thread wiso
I'm using a class to read some data from files:

import multiprocessing
from collections import defaultdict

def SingleContainer():
return list()

class Container(defaultdict):
this class store odd line in self["odd"] and even line in self["even"].
It is stupid, but it's only an example.
def __init__(self,file_name):
if type(file_name) != str:
raise AttributeError, "%s is not a string" % file_name
self.file_name = file_name
self.readen_lines = 0
def read(self):
f = open(self.file_name)
print "start reading file %s" % self.file_name
for line in f:
self.readen_lines += 1
values = line.split()
key = {0: "even", 1: "odd"}[self.readen_lines %2]
print "readen %d lines from file %s" % (self.readen_lines, 

Now I want to read more than one file per times

def do(file_name):
container = Container(file_name)
return container

if __name__ == "__main__":
file_names = ["prova_200909.log", "prova_200910.log"]
pool = multiprocessing.Pool(len(file_names))
result =,file_names)
print "Finish"

but I got:
start reading file prova_200909.log
start reading file prova_200910.log
readen 142 lines from file prova_200909.log
readen 160 lines from file prova_200910.log
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/", line 522, in __bootstrap_inner
  File "/usr/lib64/python2.6/", line 477, in run
self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.6/multiprocessing/", line 259, in 
task = get()
  File "", line 11, in __init__
raise AttributeError, "%s is not a string" % file_name
AttributeError: (AttributeError(' is not a string',), , (,))

the problem is when pool share objects, but why is it calling 
Container.__init__ with a Container parameter?

monitor reading file with thread

2010-01-08 Thread wiso
I'm reading and processing a huge file, so during the execution I want to 
now the state of the processing: how many lines are already processed, and 
so on. The first approach is:

f = open(filename)
n = 0
for l in f:
  if n % 1000 = 0:
print "Reading %d lines" %n

but I want something diffent. I want to monitor the running of the computing 
every n seconds. It's the first time I use threading, and I think mine is 
not the best solution:

import threading
import time
import Queue

class Reader():
def __init__(self,filename):
self.filename = filename
self.lineno = 0
def __iter__(self):
f = open(self.filename)
for line in f:
self.lineno += 1
time.sleep(0.01) # slow down
yield line

class Monitor(threading.Thread):
def __init__(self,reader,stop_queue,interval=2):
self.interval = interval
self.reader = reader
self.stop_queue = stop_queue
def run(self):
while True:
if self.stop_queue.get(timeout=self.interval) == "stop":
except Queue.Empty:
print "MONITOR: ", reader.lineno

reader = Reader("r1_200910.log")
q = Queue.Queue()
monitor = Monitor(reader,q)

for line in reader:
# do_somethinghard(line)

It't works, but I don't like how I'm stopping the thread. In general the 
Monitor class can be more complex, for example a progress bar. Using python 

Re: Convert month name to month number faster

2010-01-06 Thread wiso
Antoine Pitrou wrote:

> Le Wed, 06 Jan 2010 12:03:36 +0100, wiso a écrit :
>> from time import time
>> t = time(); xxx=map(to_dict,l); print time() - t # 0.5 t = time();
>> xxx=map(to_if,l); print time() - t   # 1.0
> Don't define your own function just for attribute access. Instead just
> write:
> xxx = map(month_dict.__getitem__, l)

t = time(); xxx=map(month_dict.__getitem__,l); print time() - t # 0.2

month_list = 

t = time(); xxx=map(month_list.index,l); time() - t # 0.6

Convert month name to month number faster

2010-01-06 Thread wiso
I'm optimizing the inner most loop of my script. I need to convert month 
name to month number. I'm using python 2.6 on linux x64.

month_dict = {"Jan":1,"Feb":2,"Mar":3,"Apr":4, "May":5, "Jun":6,

def to_dict(name):
  return month_dict[name]

def to_if(name):
if name == "Jan": return 1
elif name == "Feb": return 2
elif name == "Mar": return 3
elif name == "Apr": return 4
elif name == "May": return 5
elif name == "Jun": return 6
elif name == "Jul": return 7
elif name == "Aug": return 8
elif name == "Sep": return 9
elif name == "Oct": return 10
elif name == "Nov": return 11
elif name == "Dec": return 12
else: raise ValueError

import random
l = [random.choice(month_dict.keys()) for _ in range(100)]

from time import time
t = time(); xxx=map(to_dict,l); print time() - t # 0.5
t = time(); xxx=map(to_if,l); print time() - t   # 1.0

is there a faster solution? Maybe something with str.translate?

The problem is a little different because I don't read random data, but 
sorted data. For example:

l = [x for x in 
for _ in range(1000)] # ["Jan","Jan", ..., "Feb", "Feb", ...]

so maybe the to_if approach will be faster if I write the case in the best 
order. Look:

l = ["Jan"] * 100 # to_if is in the best order for "Jan"
t = time(); xxx=map(to_dict,l); print time() - t # 0.5
t = time(); xxx=map(to_if,l); print time() - t # 0.5


FileInput too slow

2010-01-04 Thread wiso
I'm trying the fileinput module, and I like it, but I don't understand why 
it's so slow... look:

from time import time
from fileinput import FileInput

file = ['r1_200907.log', 'r1_200908.log', 'r1_200909.log', 'r1_200910.log', 

def f1():
  n = 0
  for f in file:
print "new file: %s" % f
ff = open(f)
for line in ff:
  n += 1
  return n

def f2():
  f = FileInput(file)
  for line in f:
if f.isfirstline(): print "new file: %s" % f.filename()
  return f.lineno()

def f3(): # f2 simpler
  f = FileInput(file)
  for line in f:
  return f.lineno()

t = time(); f1(); print time()-t # 1.0
t = time(); f2(); print time()-t # 7.0 !!!
t = time(); f3(); print time()-t # 5.5

I'm using text files, there are 2563150 lines in total.

google tech talk code (threading module)

2009-09-08 Thread wiso
I took a little code from google tech talk. It seems interesting, but it 
doesn't work:

import sys, urllib, os, threading, Queue

q = Queue.Queue()

class RetrWorker(threading.Thread):
def run(self):
def hook(*a): print (fn,a)
while True:
url = q.get()
fn = os.path.basename(url)
print url, "->", fn
urllib.urlretrive(url, fn, hook)

for i in range(10): RetrWorker().start()
for url in sys.argv[1:]: q.put(url)

Exception in thread Thread-10:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/", line 522, in __bootstrap_inner
  File "", line 7, in run
  File "/usr/lib64/python2.6/", line 690, in setDaemon
self.daemon = daemonic
  File "/usr/lib64/python2.6/", line 683, in daemon
raise RuntimeError("cannot set daemon status of active thread");
RuntimeError: cannot set daemon status of active thread
