Dear all,

I am having some trouble with using pytables correctly, and I was hoping for 
some guidance. I would like to have one central pytables file, containing a 
VLArray that would be used by several "worker" processes. Each process should 
perform some computation, and append it as a new row to VLArray. Due to 
possible sizes of results, it would be difficult to pass results to the main 
thread for it to store into pytables file.

I wrote a mock-up file that demonstrates my problem. Essentially, it looks like 
each process sees a separate VLArray file, and acts as it is the only one 
writing into it. When all processes are joined, the original file seems 
unmodified, even though each process reported writing into the file correctly.


This happens on both machines I tried it on: CentOS linux, running on 64bit 8 
core machine, and Mac OS X Snow Leopard, running on a 2 core machine. 

Linux versions:
-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
PyTables version:  2.2
HDF5 version:      1.8.5
NumPy version:     1.4.1
Numexpr version:   1.4 (not using Intel's VML/MKL)
Zlib version:      1.2.5 (in Python interpreter)
LZO version:       2.03 (Apr 30 2008)
BZIP2 version:     1.0.5 (10-Dec-2007)
Blosc version:     1.0 (2010-07-01)
Python version:    2.6 (r26:66714, Aug 10 2010, 17:24:31) 
[GCC 4.5.0]
Platform:          linux2-x86_64
Byte-ordering:     little
Detected cores:    8
-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=

Mac OS X versions:
-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
PyTables version:  2.1.2
HDF5 version:      1.8.4
NumPy version:     1.4.0
Zlib version:      1.2.3
BZIP2 version:     1.0.5 (10-Dec-2007)
Python version:    2.6.5 |EPD 6.2-2 (64-bit)| (r265:79063, May 28 2010, 
15:24:16) 
[GCC 4.0.1 (Apple Inc. build 5488)]
Platform:          darwin-i386
Byte-ordering:     little
-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=


I'm attaching the script I wrote. You can run it by "python testvlparallel.py 
NofProcesses NofTasks". It is not strictly *minimal* but it mimics well what I 
am trying to achieve, so it's representative.

Best,
Marko Budisic

"""
Pass number of workers as the first argument, number of tasks as the second.

e.g.
python testvlparallel.py 2 10
"""

import sys
from numpy import *
from tables import *
from time import sleep
import multiprocessing as parallel 

class Worker( parallel.Process ):

    def __init__(self, myno, archive, lock, inqueue):
        parallel.Process.__init__(self)
        self.daemon = True
        self.myno = myno
        self.archive = archive
        self.lock = lock
        self.inqueue = inqueue

    def run(self):

        while True:
            task = self.inqueue.get()
            if task == None:
                print "Worker %d quitting" % self.myno
                break
            else:

                self.doit( task )


    def doit(self, N):
        print "Worker %d starting job %d with %d in archive." % (self.myno, N-2, 
                                                                 self.archive.shape[0])
        newarray = random.random( (N,3) )
        self.lock.acquire()
        self.archive.append(newarray)
        self.archive.flush()
        self.lock.release()
        sleep(0.1)
        print "Worker %d ending job %d with %d in archive." % (self.myno, N-2, 
                                                               self.archive.shape[0])


def main(argv=None):
    # set up an archive
    file = openFile( "testfile", mode="w", NODE_CACHE_SLOTS=0 )

    # elements of VLArray should be numerical matrices of shape 
    # 3 x (variable N)
    shape = (3,)
    dummy = dtype(( array([]).dtype, shape))
    myatom = Atom.from_dtype( dummy  ) # should be of Float64

    # otherwise create a new VLArray
    traces = file.createVLArray( where='/',
                                 name='traces',
                                 atom=myatom )

    file.flush() # perhaps unnecessary
    print traces, traces.atom

    noofworkers = int(argv[0])
    totaltasks = int(argv[1])
    taskqueue = parallel.Queue()
    lock = parallel.Lock()


    # create tasks - widths of random matrices
    for n in xrange(totaltasks):
        taskqueue.put( n + 3 )

    # "poison pills" for workers
    for k in xrange(noofworkers):
        taskqueue.put( None )    

    # create and run workers
    workers = [ Worker(k+1, traces,lock, taskqueue) for k in xrange(noofworkers)]
    for w in workers:
        w.start()
    for w in workers:
        w.join()

    # number of stored matrices should be equal to number of tasks specified
    try:
        assert traces.shape[0] == totaltasks
    except AssertionError:
        print "FAIL: total number of archived traces: %d. Desired: %d" % (traces.shape[0],
                                                                          totaltasks)
    else:
        print "SUCCESS: total number of archived traces: %d " % traces.shape[0]


if __name__=="__main__":
    sys.exit( main(sys.argv[1:]) )

------------------------------------------------------------------------------
The Next 800 Companies to Lead America's Growth: New Video Whitepaper
David G. Thomson, author of the best-selling book "Blueprint to a 
Billion" shares his insights and actions to help propel your 
business during the next growth cycle. Listen Now!
http://p.sf.net/sfu/SAP-dev2dev
_______________________________________________
Pytables-users mailing list
Pytables-users@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/pytables-users

Reply via email to