I am trying to allow multiple threads read/write access to pytables data
and found it is necessary to call flush() before any read.  If not, the
latest data is not returned.  However, this can cause a RuntimeError.  I
have tried protecting pytables access with both locks and queues as done by
joshayers (
https://github.com/PyTables/PyTables/blob/develop/examples/multiprocess_access_queues.py).
 In either case I still get RuntimeError: dictionary changed size during
iteration when doing the flush.  (incidentally using the Locks appears to
be much faster than using queues in my unscientific tests...)

I have tried versions 2.4 and 2.3.1 with the same results.  Interestingly
this only appears to happen if there are multiple tables/groups in the H5
file.  To investigate this behavior further I create a test program to
illustrate (below).  When run with num_groups = 5 num_tables = 5 (or
greater) I see the runtime error every time.  When these values are smaller
than this it doesn't (at least in a short test period).

I might be doing something unexpected with pytables, but this seems pretty
straight forward to me.  Any help is appreciated.


import tables
import threading
import random
import time

lock = threading.Lock()
def synchronized(lock):
    def wrap(f):
        def newFunction(*args, **kw):
            lock.acquire()
            try:
                return f(*args, **kw)
            finally:
                lock.release()
        return newFunction
    return wrap

class TableValue(tables.IsDescription):
    a = tables.Int64Col(pos=1)
    b = tables.UInt32Col(pos=2)

class Test():
    def __init__(self):
        self.h5 = None
        self.h5 = tables.openFile('/data/test.h5', mode='w')
        self.num_groups = 5
        self.num_tables = 5

        self.groups = [self.h5.createGroup('/', "group%d"%i) for i in
range(self.num_groups)]
        self.tables = []
        for group in self.groups:
            tbls = [self.h5.createTable(group, 'table%d'%i, TableValue) for
i in range(self.num_tables)]
            self.tables.append (tbls)
            for table in tbls:
                table.cols.a.createIndex()

        self.stats = {'read': 0,
                      'write': 0}

    @synchronized(lock)
    def __del__(self):
        if self.h5 != None:
            self.h5.close()
            self.h5 = None

    @synchronized(lock)
    def write(self):
        x = self.tables[random.randint(0,
self.num_groups-1)][random.randint(0, self.num_tables-1)].row
        x['a'] = random.randint(0, 100)
        x['b'] = random.randint(0, 100)
        x.append()
        self.stats['write'] += 1

    @synchronized(lock)
    def read(self):
        # flush so we can query latest data
        self.h5.flush()

        table = self.tables[random.randint(0,
self.num_groups-1)][random.randint(0, self.num_tables-1)]
        # do some query
        results = table.readWhere('a > %d'%(random.randint(0, 100)))
        #print 'Query got %d hits'%(len(results))

        self.stats['read'] += 1

class Worker(threading.Thread):
    def __init__(self, method):
        threading.Thread.__init__(self)
        self.method = method
        self.stopEvt = threading.Event()

    def run(self):
        while not self.stopEvt.is_set():
            self.method()
            time.sleep(random.random()/100.0)

    def stop(self):
        self.stopEvt.set()

def main():
    t = Test()

    threads = [Worker(t.write) for _i in range(10)]
    threads.extend([Worker(t.read) for _i in range(10)])

    for thread in threads:
        thread.start()

    time.sleep(5)

    for thread in threads:
        thread.stop()

    for thread in threads:
        thread.join()

    print t.stats



if __name__ == "__main__":
    main()
------------------------------------------------------------------------------
LogMeIn Rescue: Anywhere, Anytime Remote support for IT. Free Trial
Remotely access PCs and mobile devices and provide instant support
Improve your efficiency, and focus on delivering more value-add services
Discover what IT Professionals Know. Rescue delivers
http://p.sf.net/sfu/logmein_12329d2d
_______________________________________________
Pytables-users mailing list
Pytables-users@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/pytables-users

Reply via email to