I am trying to allow multiple threads read/write access to pytables data
and found it is necessary to call flush() before any read. If not, the
latest data is not returned. However, this can cause a RuntimeError. I
have tried protecting pytables access with both locks and queues as done by
joshayers (
https://github.com/PyTables/PyTables/blob/develop/examples/multiprocess_access_queues.py).
In either case I still get RuntimeError: dictionary changed size during
iteration when doing the flush. (incidentally using the Locks appears to
be much faster than using queues in my unscientific tests...)
I have tried versions 2.4 and 2.3.1 with the same results. Interestingly
this only appears to happen if there are multiple tables/groups in the H5
file. To investigate this behavior further I create a test program to
illustrate (below). When run with num_groups = 5 num_tables = 5 (or
greater) I see the runtime error every time. When these values are smaller
than this it doesn't (at least in a short test period).
I might be doing something unexpected with pytables, but this seems pretty
straight forward to me. Any help is appreciated.
import tables
import threading
import random
import time
lock = threading.Lock()
def synchronized(lock):
def wrap(f):
def newFunction(*args, **kw):
lock.acquire()
try:
return f(*args, **kw)
finally:
lock.release()
return newFunction
return wrap
class TableValue(tables.IsDescription):
a = tables.Int64Col(pos=1)
b = tables.UInt32Col(pos=2)
class Test():
def __init__(self):
self.h5 = None
self.h5 = tables.openFile('/data/test.h5', mode='w')
self.num_groups = 5
self.num_tables = 5
self.groups = [self.h5.createGroup('/', "group%d"%i) for i in
range(self.num_groups)]
self.tables = []
for group in self.groups:
tbls = [self.h5.createTable(group, 'table%d'%i, TableValue) for
i in range(self.num_tables)]
self.tables.append (tbls)
for table in tbls:
table.cols.a.createIndex()
self.stats = {'read': 0,
'write': 0}
@synchronized(lock)
def __del__(self):
if self.h5 != None:
self.h5.close()
self.h5 = None
@synchronized(lock)
def write(self):
x = self.tables[random.randint(0,
self.num_groups-1)][random.randint(0, self.num_tables-1)].row
x['a'] = random.randint(0, 100)
x['b'] = random.randint(0, 100)
x.append()
self.stats['write'] += 1
@synchronized(lock)
def read(self):
# flush so we can query latest data
self.h5.flush()
table = self.tables[random.randint(0,
self.num_groups-1)][random.randint(0, self.num_tables-1)]
# do some query
results = table.readWhere('a > %d'%(random.randint(0, 100)))
#print 'Query got %d hits'%(len(results))
self.stats['read'] += 1
class Worker(threading.Thread):
def __init__(self, method):
threading.Thread.__init__(self)
self.method = method
self.stopEvt = threading.Event()
def run(self):
while not self.stopEvt.is_set():
self.method()
time.sleep(random.random()/100.0)
def stop(self):
self.stopEvt.set()
def main():
t = Test()
threads = [Worker(t.write) for _i in range(10)]
threads.extend([Worker(t.read) for _i in range(10)])
for thread in threads:
thread.start()
time.sleep(5)
for thread in threads:
thread.stop()
for thread in threads:
thread.join()
print t.stats
if __name__ == "__main__":
main()
------------------------------------------------------------------------------
LogMeIn Rescue: Anywhere, Anytime Remote support for IT. Free Trial
Remotely access PCs and mobile devices and provide instant support
Improve your efficiency, and focus on delivering more value-add services
Discover what IT Professionals Know. Rescue delivers
http://p.sf.net/sfu/logmein_12329d2d
_______________________________________________
Pytables-users mailing list
Pytables-users@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/pytables-users