Hello, Julien.

JA> not yet. Be sure I'll keep you posted when it's done. I've just been
JA> really busy here last week with projects deadlines.

Please try this testcase if it will reproduce bug. please use updated
PythonDirectory.py

--
Yura Smolsky,
http://altervisionmedia.com/
#!/usr/bin/python2.4

import os, shutil, time
from unittest import TestCase, main
from PyLucene import *
import threading
from index import PythonDirectory

class PythonDirTestCase(TestCase):
    PATH = 'indexTest'

    def setUp(self):
        self.lock = threading.Lock()
        self.total = 0

    def tearDown(self):
        if os.path.exists(self.PATH):
            shutil.rmtree(self.PATH)
        
    def testLockTimeout(self):
        dirIndex = PythonDirectory.PythonFileDirectory.getDirectory(self.PATH, 
True)
        analyzer = PerFieldAnalyzerWrapper(StandardAnalyzer())
        
        #print "open first IndexWriter"
        writer = IndexWriter(dirIndex, analyzer, True)
        writer.setUseCompoundFile(False)
        self._indexDoc(writer, {'id': 1})
        self._indexDoc(writer, {'id': 2})
        
        delay = float(writer.getWriteLockTimeout()/2)/1000
        #print "close first IndexWriter after delay, ms:", delay
        
        th = PythonThread(target=self._closeIndex, args=(writer, dirIndex, 
delay))
        th.start()

        #print "open second IndexWriter"
        dirIndex2 = PythonDirectory.PythonFileDirectory.getDirectory(self.PATH, 
False)
        writer2 = IndexWriter(dirIndex2, analyzer, False)
        writer2.setUseCompoundFile(False)
        
        self._indexDoc(writer2, {'id': 3})
        self._indexDoc(writer2, {'id': 4})
        
        writer2.close()
        dirIndex2.close()

    def testMultiThreadIndexingSearching(self):
        dirIndex = PythonDirectory.PythonFileDirectory.getDirectory(self.PATH, 
True)
        analyzer = PerFieldAnalyzerWrapper(StandardAnalyzer())
        writer = IndexWriter(dirIndex, analyzer, True)
        writer.setUseCompoundFile(False)

        self.total = 0
        tmax = 101

        threads = []
        for i in xrange(tmax):
            threads.append(PythonThread(
                target=self._indexDoc,
                args=(writer, {'id': 10000+i})))

        for thread in threads:
            thread.start()

        for thread in threads:
            thread.join()

        self.assertEqual(self.total, tmax)

        writer.close()
        dirIndex.close()

        dirIndex = PythonDirectory.PythonFileDirectory.getDirectory(self.PATH, 
False)
        #dirIndex = FSDirectory.getDirectory(self.PATH, False)
        searcher = IndexSearcher(dirIndex)
        self.total = 0

        threads = []
        for i in xrange(tmax):
            threads.append(PythonThread(
                target=self._searchDoc,
                args=(searcher, 10000+i, )))

        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()

        searcher.close()
        dirIndex.close()
        self.assertEqual(self.total, tmax)

    def _increment(self):
        self.lock.acquire()
        self.total += 1
        self.lock.release()

    def _indexDoc(self, writer, fields):
        doc = Document()
        doc.add(Field("id", str(fields['id']), Field.Store.YES, 
Field.Index.UN_TOKENIZED))
        doc.add(Field("content", 
                      "some dummy content goes here. text text text text text 
text text text", 
                      Field.Store.YES, 
                      Field.Index.TOKENIZED))
        writer.addDocument(doc)
        self._increment()
    
    def _searchDoc(self, searcher, query):
        searcher1 = searcher
        qp = TermQuery(Term('id', str(query)))
        #self.lock.acquire()
        hits = searcher1.search(qp)
        #self.lock.release()
        self.assertEqual(1, hits.length())
        self._increment()
        
    def _closeIndex(self, writer, store, delay):
        time.sleep(delay)
        writer.close()
        store.close()
        #print "IndexWriter closed"
        
    #####
        #store = FSDirectory.getDirectory(self.STORE_DIR+'/index', False)
        #store = PythonFileDirectory(self.STORE_DIR+'/index', False)
        #searcher = IndexSearcher(store)

        #qp = TermQuery(Term('lang', 'en'))
        #hits = searcher.search(qp)
        #self.assertEqual(2, hits.length())

        #prefixQ = PrefixQuery(Term('outlink', 'livejournal.com/users/avb'))
        #hits = searcher.search(prefixQ)
        #print '\n'+prefixQ.toString()
        #for h in range(hits.length()):
            #doc =  hits.doc(h)
            #print doc.getValues('outlink')
            #print doc.getValues('outtitle')
        #self.assertEqual(1, hits.length())
        
        #prefixQ = PrefixQuery(Term('outlink', 'livejournal.com/users'))
        #hits = searcher.search(prefixQ)
        #self.assertEqual(2, hits.length())

        #prefixQ = PrefixQuery(Term('outlink', 'buddy.blogger.com'))
        #hits = searcher.search(prefixQ)
        #print '\n'+prefixQ.toString()
        #for h in range(hits.length()):
            #doc =  hits.doc(h)
            #print doc.getValues('outlink')
            #print doc.getValues('outtitle')
            #print doc.getValues('tag')
        #self.assertEqual(2, hits.length())
        
        #searcher.close()

if __name__ == "__main__":
    import sys
    if '-loop' in sys.argv:
        sys.argv.remove('-loop')
        while True:
            try:
                main()
            except:
                pass
    else:
        main()
import os, sys
import md5
import time
import threading
import PyLucene

DEBUG = False

class DebugWrapper( object ):

    def __init__(self, obj ):
        self.obj = obj

    def __getattr__(self, name):
        print self.obj.__class__.__name__, self.obj.name, name
        sys.stdout.flush()
        return getattr(self.obj, name )
        
class DebugFactory( object ):
    
    def __init__(self, klass):
        self.klass = klass
        
    def __call__(self, *args, **kw):
        instance = self.klass(*args, **kw)
        return DebugWrapper( instance )

class PythonFileLock( object ):
    # safe for a multimple processes
    
    LOCK_POLL_INTERVAL = 100
    
    def __init__(self, lockDir, lockFile):
        self.name = lockFile
        self.lockDir = lockDir
        self.lockFile = os.path.join(lockDir, lockFile)
        self.mutex = threading.Lock()
        #print self.lockFile

    def isLocked(self):
        try:
            self.mutex.acquire()
            return os.path.exists(self.lockFile)
        finally:
            self.mutex.release()

    def obtainTimeout( self, timeout ):
        locked = self.obtain()
        maxSleepCount = round(timeout / self.LOCK_POLL_INTERVAL)
        sleepCount = 0
        while (not locked):
            if sleepCount >= maxSleepCount:
                raise Exception("Lock obtain timed out: " + self.toString())
            time.sleep(float(timeout) / 1000 / maxSleepCount)
            locked = self.obtain()
            sleepCount += 1
        return locked

    def obtain( self ):
        if not os.path.exists(self.lockDir):
            os.makedirs(self.lockDir)
        
        if self.isLocked():
            return False
        try:
            self.mutex.acquire()
            try:
                open(self.lockFile, 'w').flush()
            except:
                return False
            else:
                return True
        finally:
            self.mutex.release()

    def release( self ):
        try:
            self.mutex.acquire()
            os.remove(self.lockFile)
            return True
        finally:
            self.mutex.release()
    
    def toString(self):
        return 'Lock@' + self.lockFile


class PythonFileStream(object):

    def __init__(self, name, fh, size=0L):
        self.name = name
        self.fh = fh
        self._length = size
        self.isOpen = True

    def close(self, isClone=False):
        if isClone or not self.isOpen:
            return
        self.isOpen = False
        self.fh.close()

    def seek(self, pos):
        self.fh.seek(pos)

    def read(self, length, pos):
        self.fh.seek(pos)
        return self.fh.read(length)

    def write(self, buffer):
        self.fh.write(buffer)
        self.fh.flush()
        self._length += len(buffer)

    def length(self):
        return self._length

# hashtable which stores links to opened already directories
DIRECTORIES = {}
DIR_MUTEX = threading.Lock()
        
class PythonFileDirectory( object ):

    LOCK_DIR = PyLucene.System.getProperty("org.apache.lucene.lockDir",
      PyLucene.System.getProperty("java.io.tmpdir"));
    
    @staticmethod
    def getDirectory(path, create=False):
        """ Returns the directory instance for the named location.
        Directories are cached, so that, for a given canonical path, the same
        FSDirectory instance will always be returned.  This permits
        synchronization on directories."""
        path = os.path.realpath(path)
        dir = False
        try:
            DIR_MUTEX.acquire()
            if DIRECTORIES.has_key(path):
                dir = DIRECTORIES[path]
            else:
                dir = PythonFileDirectory(path, create)
                DIRECTORIES[path] = dir
        finally:
            DIR_MUTEX.release()
        if dir and create:
            dir.create()
        return dir
        
    def __init__(self, path, create=False ):
        self.path = os.path.realpath(path)
        self.name = self.path
        self.mutexRename = threading.Lock()
        self._locks = {}
        self._streams = []
        if not self.LOCK_DIR:
            self.LOCK_DIR = self.path
        if create:
            self.create()

        assert os.path.isdir( path )

    def create(self):
        if not os.path.exists(self.path):
            os.makedirs(self.path)

        oldFiles = os.listdir(self.path)
        for oldFile in oldFiles:
            os.remove(os.path.join(self.path, oldFile))

        lockPrefix = self.getLockPrefix()
        tmpFiles = os.listdir(self.LOCK_DIR)
        for tmpFile in tmpFiles:
            if tmpFile.startswith(lockPrefix):
                os.remove(os.path.join(self.LOCK_DIR, tmpFile))
        
        
    def close(self):
        for s in self._streams:
            s.close()

    def createOutput(self, name ):
        file_path = os.path.join( self.path, name )
        fh = open( file_path, "w" )
        stream = PythonFileStream( name, fh )
        self._streams.append(stream)
        return stream

    def deleteFile( self, name ):
        if self.fileExists(name):
            os.unlink( os.path.join( self.path, name ) )

    def fileExists( self, name ):
        return os.path.exists( os.path.join( self.path, name ) )

    def fileLength( self, name ):
        file_path = os.path.join( self.path, name )
        return os.path.getsize( file_path )

    def fileModified( self, name ):
        file_path = os.path.join( self.path, name )
        return int( os.path.getmtime( file_path ))

    def list(self):
        return os.listdir( self.path )

    def openInput( self, name ):
        file_path = os.path.join( self.path, name )
        fh = open( file_path, 'r')
        stream = PythonFileStream( name, fh, os.path.getsize(file_path) )
        self._streams.append(stream)
        return stream

    def renameFile(self, fname, tname):
        try:
            self.mutexRename.acquire()
            fromName = os.path.join( self.path, fname )
            toName = os.path.join( self.path, tname )
            if os.path.exists( toName ):
                os.remove( toName )
            os.rename( fromName, toName )
        finally:
            self.mutexRename.release()

    def touchFile( self, name):

        file_path = os.path.join( self.path, name )        
        fh = open( file_path, 'rw')
        c = fh.read(1)
        fh.seek(0)
        fh.write(c)
        fh.close()

    def makeLock( self, name ):
        lockDir = self.LOCK_DIR
        lockFile = self.getLockPrefix() + '-' + name
        lock = self._locks.setdefault( name, PythonFileLock(lockDir, lockFile) )
        #print lock.toString()
        return lock

    def getHexDigest(self, string):
        m = md5.new(string)
        return m.hexdigest()
    
    def getLockPrefix(self):
        dirName = os.path.realpath(self.path)
        prefix = 'lucene-' + self.getHexDigest(dirName)
        return prefix

if DEBUG:
    _globals = globals()
    _globals['PythonFileDirectory'] = DebugFactory( PythonFileDirectory )
    _globals['PythonFileStream'] = DebugFactory( PythonFileStream )
    _globals['PythonFileLock'] = DebugFactory( PythonFileLock )
    del _globals
_______________________________________________
pylucene-dev mailing list
[email protected]
http://lists.osafoundation.org/mailman/listinfo/pylucene-dev

Reply via email to