Hello, Julien.
JA> not yet. Be sure I'll keep you posted when it's done. I've just been
JA> really busy here last week with projects deadlines.
Please try this testcase if it will reproduce bug. please use updated
PythonDirectory.py
--
Yura Smolsky,
http://altervisionmedia.com/
#!/usr/bin/python2.4
import os, shutil, time
from unittest import TestCase, main
from PyLucene import *
import threading
from index import PythonDirectory
class PythonDirTestCase(TestCase):
PATH = 'indexTest'
def setUp(self):
self.lock = threading.Lock()
self.total = 0
def tearDown(self):
if os.path.exists(self.PATH):
shutil.rmtree(self.PATH)
def testLockTimeout(self):
dirIndex = PythonDirectory.PythonFileDirectory.getDirectory(self.PATH,
True)
analyzer = PerFieldAnalyzerWrapper(StandardAnalyzer())
#print "open first IndexWriter"
writer = IndexWriter(dirIndex, analyzer, True)
writer.setUseCompoundFile(False)
self._indexDoc(writer, {'id': 1})
self._indexDoc(writer, {'id': 2})
delay = float(writer.getWriteLockTimeout()/2)/1000
#print "close first IndexWriter after delay, ms:", delay
th = PythonThread(target=self._closeIndex, args=(writer, dirIndex,
delay))
th.start()
#print "open second IndexWriter"
dirIndex2 = PythonDirectory.PythonFileDirectory.getDirectory(self.PATH,
False)
writer2 = IndexWriter(dirIndex2, analyzer, False)
writer2.setUseCompoundFile(False)
self._indexDoc(writer2, {'id': 3})
self._indexDoc(writer2, {'id': 4})
writer2.close()
dirIndex2.close()
def testMultiThreadIndexingSearching(self):
dirIndex = PythonDirectory.PythonFileDirectory.getDirectory(self.PATH,
True)
analyzer = PerFieldAnalyzerWrapper(StandardAnalyzer())
writer = IndexWriter(dirIndex, analyzer, True)
writer.setUseCompoundFile(False)
self.total = 0
tmax = 101
threads = []
for i in xrange(tmax):
threads.append(PythonThread(
target=self._indexDoc,
args=(writer, {'id': 10000+i})))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
self.assertEqual(self.total, tmax)
writer.close()
dirIndex.close()
dirIndex = PythonDirectory.PythonFileDirectory.getDirectory(self.PATH,
False)
#dirIndex = FSDirectory.getDirectory(self.PATH, False)
searcher = IndexSearcher(dirIndex)
self.total = 0
threads = []
for i in xrange(tmax):
threads.append(PythonThread(
target=self._searchDoc,
args=(searcher, 10000+i, )))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
searcher.close()
dirIndex.close()
self.assertEqual(self.total, tmax)
def _increment(self):
self.lock.acquire()
self.total += 1
self.lock.release()
def _indexDoc(self, writer, fields):
doc = Document()
doc.add(Field("id", str(fields['id']), Field.Store.YES,
Field.Index.UN_TOKENIZED))
doc.add(Field("content",
"some dummy content goes here. text text text text text
text text text",
Field.Store.YES,
Field.Index.TOKENIZED))
writer.addDocument(doc)
self._increment()
def _searchDoc(self, searcher, query):
searcher1 = searcher
qp = TermQuery(Term('id', str(query)))
#self.lock.acquire()
hits = searcher1.search(qp)
#self.lock.release()
self.assertEqual(1, hits.length())
self._increment()
def _closeIndex(self, writer, store, delay):
time.sleep(delay)
writer.close()
store.close()
#print "IndexWriter closed"
#####
#store = FSDirectory.getDirectory(self.STORE_DIR+'/index', False)
#store = PythonFileDirectory(self.STORE_DIR+'/index', False)
#searcher = IndexSearcher(store)
#qp = TermQuery(Term('lang', 'en'))
#hits = searcher.search(qp)
#self.assertEqual(2, hits.length())
#prefixQ = PrefixQuery(Term('outlink', 'livejournal.com/users/avb'))
#hits = searcher.search(prefixQ)
#print '\n'+prefixQ.toString()
#for h in range(hits.length()):
#doc = hits.doc(h)
#print doc.getValues('outlink')
#print doc.getValues('outtitle')
#self.assertEqual(1, hits.length())
#prefixQ = PrefixQuery(Term('outlink', 'livejournal.com/users'))
#hits = searcher.search(prefixQ)
#self.assertEqual(2, hits.length())
#prefixQ = PrefixQuery(Term('outlink', 'buddy.blogger.com'))
#hits = searcher.search(prefixQ)
#print '\n'+prefixQ.toString()
#for h in range(hits.length()):
#doc = hits.doc(h)
#print doc.getValues('outlink')
#print doc.getValues('outtitle')
#print doc.getValues('tag')
#self.assertEqual(2, hits.length())
#searcher.close()
if __name__ == "__main__":
import sys
if '-loop' in sys.argv:
sys.argv.remove('-loop')
while True:
try:
main()
except:
pass
else:
main()
import os, sys
import md5
import time
import threading
import PyLucene
DEBUG = False
class DebugWrapper( object ):
def __init__(self, obj ):
self.obj = obj
def __getattr__(self, name):
print self.obj.__class__.__name__, self.obj.name, name
sys.stdout.flush()
return getattr(self.obj, name )
class DebugFactory( object ):
def __init__(self, klass):
self.klass = klass
def __call__(self, *args, **kw):
instance = self.klass(*args, **kw)
return DebugWrapper( instance )
class PythonFileLock( object ):
# safe for a multimple processes
LOCK_POLL_INTERVAL = 100
def __init__(self, lockDir, lockFile):
self.name = lockFile
self.lockDir = lockDir
self.lockFile = os.path.join(lockDir, lockFile)
self.mutex = threading.Lock()
#print self.lockFile
def isLocked(self):
try:
self.mutex.acquire()
return os.path.exists(self.lockFile)
finally:
self.mutex.release()
def obtainTimeout( self, timeout ):
locked = self.obtain()
maxSleepCount = round(timeout / self.LOCK_POLL_INTERVAL)
sleepCount = 0
while (not locked):
if sleepCount >= maxSleepCount:
raise Exception("Lock obtain timed out: " + self.toString())
time.sleep(float(timeout) / 1000 / maxSleepCount)
locked = self.obtain()
sleepCount += 1
return locked
def obtain( self ):
if not os.path.exists(self.lockDir):
os.makedirs(self.lockDir)
if self.isLocked():
return False
try:
self.mutex.acquire()
try:
open(self.lockFile, 'w').flush()
except:
return False
else:
return True
finally:
self.mutex.release()
def release( self ):
try:
self.mutex.acquire()
os.remove(self.lockFile)
return True
finally:
self.mutex.release()
def toString(self):
return 'Lock@' + self.lockFile
class PythonFileStream(object):
def __init__(self, name, fh, size=0L):
self.name = name
self.fh = fh
self._length = size
self.isOpen = True
def close(self, isClone=False):
if isClone or not self.isOpen:
return
self.isOpen = False
self.fh.close()
def seek(self, pos):
self.fh.seek(pos)
def read(self, length, pos):
self.fh.seek(pos)
return self.fh.read(length)
def write(self, buffer):
self.fh.write(buffer)
self.fh.flush()
self._length += len(buffer)
def length(self):
return self._length
# hashtable which stores links to opened already directories
DIRECTORIES = {}
DIR_MUTEX = threading.Lock()
class PythonFileDirectory( object ):
LOCK_DIR = PyLucene.System.getProperty("org.apache.lucene.lockDir",
PyLucene.System.getProperty("java.io.tmpdir"));
@staticmethod
def getDirectory(path, create=False):
""" Returns the directory instance for the named location.
Directories are cached, so that, for a given canonical path, the same
FSDirectory instance will always be returned. This permits
synchronization on directories."""
path = os.path.realpath(path)
dir = False
try:
DIR_MUTEX.acquire()
if DIRECTORIES.has_key(path):
dir = DIRECTORIES[path]
else:
dir = PythonFileDirectory(path, create)
DIRECTORIES[path] = dir
finally:
DIR_MUTEX.release()
if dir and create:
dir.create()
return dir
def __init__(self, path, create=False ):
self.path = os.path.realpath(path)
self.name = self.path
self.mutexRename = threading.Lock()
self._locks = {}
self._streams = []
if not self.LOCK_DIR:
self.LOCK_DIR = self.path
if create:
self.create()
assert os.path.isdir( path )
def create(self):
if not os.path.exists(self.path):
os.makedirs(self.path)
oldFiles = os.listdir(self.path)
for oldFile in oldFiles:
os.remove(os.path.join(self.path, oldFile))
lockPrefix = self.getLockPrefix()
tmpFiles = os.listdir(self.LOCK_DIR)
for tmpFile in tmpFiles:
if tmpFile.startswith(lockPrefix):
os.remove(os.path.join(self.LOCK_DIR, tmpFile))
def close(self):
for s in self._streams:
s.close()
def createOutput(self, name ):
file_path = os.path.join( self.path, name )
fh = open( file_path, "w" )
stream = PythonFileStream( name, fh )
self._streams.append(stream)
return stream
def deleteFile( self, name ):
if self.fileExists(name):
os.unlink( os.path.join( self.path, name ) )
def fileExists( self, name ):
return os.path.exists( os.path.join( self.path, name ) )
def fileLength( self, name ):
file_path = os.path.join( self.path, name )
return os.path.getsize( file_path )
def fileModified( self, name ):
file_path = os.path.join( self.path, name )
return int( os.path.getmtime( file_path ))
def list(self):
return os.listdir( self.path )
def openInput( self, name ):
file_path = os.path.join( self.path, name )
fh = open( file_path, 'r')
stream = PythonFileStream( name, fh, os.path.getsize(file_path) )
self._streams.append(stream)
return stream
def renameFile(self, fname, tname):
try:
self.mutexRename.acquire()
fromName = os.path.join( self.path, fname )
toName = os.path.join( self.path, tname )
if os.path.exists( toName ):
os.remove( toName )
os.rename( fromName, toName )
finally:
self.mutexRename.release()
def touchFile( self, name):
file_path = os.path.join( self.path, name )
fh = open( file_path, 'rw')
c = fh.read(1)
fh.seek(0)
fh.write(c)
fh.close()
def makeLock( self, name ):
lockDir = self.LOCK_DIR
lockFile = self.getLockPrefix() + '-' + name
lock = self._locks.setdefault( name, PythonFileLock(lockDir, lockFile) )
#print lock.toString()
return lock
def getHexDigest(self, string):
m = md5.new(string)
return m.hexdigest()
def getLockPrefix(self):
dirName = os.path.realpath(self.path)
prefix = 'lucene-' + self.getHexDigest(dirName)
return prefix
if DEBUG:
_globals = globals()
_globals['PythonFileDirectory'] = DebugFactory( PythonFileDirectory )
_globals['PythonFileStream'] = DebugFactory( PythonFileStream )
_globals['PythonFileLock'] = DebugFactory( PythonFileLock )
del _globals
_______________________________________________
pylucene-dev mailing list
[email protected]
http://lists.osafoundation.org/mailman/listinfo/pylucene-dev