We are having a problem with Lucene in a high concurrency create/delete/search situation. I thought I fixed all these problems, but I guess not.
Here's what's happening. We are conducting load testing on our application. On a Windows 2000 server using lucene-1.3-final with compound file enabled, a worker thread is creating new Documents as it ingests content. Meanwhile, a test script is going that is hitting the search part of our application (I think the script also updates and deletes Documents, but I am not sure. My colleague who wrote it has left for the day so I can't ask him.). The scripted test passes with 1, 5, and 10 users hitting the application. At 20 users, we get this exception: [Task Worker1] ERROR com.ancept.ams.search.lucene.LuceneIndexer - Caught exception closing IndexReader in finally block java.io.IOException: couldn't delete segments at org.apache.lucene.store.FSDirectory.renameFile(FSDirectory.java:236) at org.apache.lucene.index.SegmentInfos.write(SegmentInfos.java(Compiled Code)) at org.apache.lucene.index.SegmentReader$1.doBody(SegmentReader.java:179 ) at org.apache.lucene.store.Lock$With.run(Lock.java:148) at org.apache.lucene.index.SegmentReader.doClose(SegmentReader.java(Comp iled Code)) at org.apache.lucene.index.IndexReader.close(IndexReader.java(Inlined Co mpiled Code)) at org.apache.lucene.index.SegmentsReader.doClose(SegmentsReader.java(Co mpiled Code)) at org.apache.lucene.index.IndexReader.close(IndexReader.java(Compiled C ode)) at com.ancept.ams.search.lucene.LuceneIndexer.delete(LuceneIndexer.java: 266) All write access to the index is controlled in that LuceneIndexer class by synchronizing on a static lock object. Searching is handled in another part of the code, which creates new IndexSearchers as necessary when the index changes. I do not rely on finalization to clean up these searchers because we found it to be unreliable. I keep track of threads using each searcher and then close it when that number drops to 0 if the searcher is outdated. My problem seems similar to what Robert Leftwich asked about on this mailing list in January 2001. Google Cache: http://64.233.179.104/search?q=cache:1D4h1vSh5AQJ:www.geocrawler.com/mail/msg.php3%3Fmsg_id%3D5020057++lucene+multithreading+problems+site:geocrawler.com&hl=en Doug Cutting replied to him saying that he should synchronize calls to IndexReader.open() and IndexReader.close(): Google Cache: http://64.233.179.104/search?q=cache:arztiytQ42QJ:www.geocrawler.com/archives/3/2624/2001/1/0/5020870/++lucene+multithreading+problems+site:geocrawler.com&hl=en Robert Leftwich then found a problem with his code and eliminated a second IndexReader that was messing stuff up: Google Cache: http://64.233.179.104/search?q=cache:jSIsi6t9KH8J:www.geocrawler.com/mail/msg.php3%3Fmsg_id%3D5037517++lucene+multithreading+problems+site:geocrawler.com&hl=en However, there are differences between Leftwich's design and mine, and besides, that thread is four years old. (Are there even exisiting archives for lucene-user throughout 2001 anywhere?) So any advice would be appreciated. Do I need to synchronize _all_ IndexReader.open() and IndexReader.close() calls? Or is it more likely that I'm missing something in my class that modifies the index? The code is attached. Thank you, Luke Francl
// $Id: LuceneIndexer.java 20473 2004-10-19 17:20:10Z lfrancl $ package com.ancept.ams.search.lucene; import com.ancept.ams.asset.AssetUtils; import com.ancept.ams.asset.AttributeValue; import com.ancept.ams.asset.IAsset; import com.ancept.ams.asset.IAssetIdentifier; import com.ancept.ams.asset.IAssetList; import com.ancept.ams.asset.ITimeMetadataAsset; import com.ancept.ams.asset.IVideoAssetView; import com.ancept.ams.controller.RelayFactory; import com.ancept.ams.enums.AttributeNamespace; import com.ancept.ams.enums.AttributeType; import com.ancept.ams.enums.TimeMetadataType; import com.ancept.ams.relay.IAssetRelay; import com.ancept.ams.search.Indexer; import com.ancept.ams.search.Fields; import com.ancept.ams.util.SystemConfig; import com.ancept.ams.util.PerformanceMonitor; import org.apache.log4j.Logger; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.snowball.SnowballAnalyzer; import org.apache.lucene.analysis.StopAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.Term; import java.io.File; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.List; /** * Controls access to the Lucene index. * * @author Luke Francl **/ public final class LuceneIndexer implements Indexer { private static final Logger l4j = Logger.getLogger( LuceneIndexer.class ); static File INDEX_DIRECTORY = new File( SystemConfig.getProperty( "LuceneIndexDir", "./index" ) ); /** * For parsing Queries, you must use the same analyser used to * index data. **/ static Analyzer ANALYZER = new SnowballAnalyzer( "English", StopAnalyzer.ENGLISH_STOP_WORDS ); private static LuceneIndexer instance = new LuceneIndexer(); /** ISO format without dashes for a more compact representation. **/ static final DateFormat LUCENE_DATE_FORMAT = new SimpleDateFormat( "yyyyMMdd" ); /** * This object is used to synchronize index write operations. * * AMS uses Lucene with incremental updates to the index each time * an asset is changed. * * Since changes cannot be flushed to the index without closing * the IndexWriter, each method in this class creates a new IndexWriter * (or, in the case of delete, IndexReader). Because two threads cannot * write to the same index at the same time, this locking is necessary. **/ private static final Object lock = new Object(); /** The initializeIndex will set this if there is an error. **/ private static Exception initializationException = null; private LuceneIndexer() { initializeIndex(); } public static LuceneIndexer getInstance() { if ( initializationException != null ) { throw new RuntimeException ( "Initialization of Lucene Indexer failed. " + "Exception was:" + initializationException ); } return instance; } void optimize() { synchronized( lock ) { IndexWriter writer = null; try { writer = new IndexWriter( INDEX_DIRECTORY, ANALYZER, false ); // Reduces number of files required for an index. // See bug 16527 writer.setUseCompoundFile( true ); writer.optimize(); } catch ( IOException e ) { l4j.error( "Caught IOException trying to optimize index", e ); } finally { try { if ( writer != null ) { writer.close(); } } catch ( IOException e ) { l4j.error( "Exception closing IndexWriter " + "in finally block", e ); } } } } public void index( List deletes, List inserts ) { delete( deletes ); index( inserts ); } private void index( List inserts ) { if ( inserts.isEmpty() ) { return; } List documents = new ArrayList(); for ( Iterator iter = inserts.iterator(); iter.hasNext(); ) { if ( PerformanceMonitor.ENABLED ) { PerformanceMonitor.start( PerformanceMonitor.TEXT_INDEX_CATEGORY, "Create index document" ); } documents.add( createDocument( (IAsset) iter.next() ) ); if ( PerformanceMonitor.ENABLED ) { PerformanceMonitor.stop( PerformanceMonitor.TEXT_INDEX_CATEGORY, "Create index document" ); } } IndexWriter writer = null; // Basted on a mis-read of http://www.onjava.com/lpt/a/3273 I // had this synchronization only around the addDocument // method. However, that only works for multiple threads // sharing the same IndexWriter, not "index" as indicated by // the article. synchronized( lock ) { try { writer = new IndexWriter( INDEX_DIRECTORY, ANALYZER, false ); // Reduces number of files required for an index. // See bug 16527 writer.setUseCompoundFile( true ); for ( Iterator docs = documents.iterator(); docs.hasNext(); ) { Document assetDocument = (Document) docs.next(); if ( PerformanceMonitor.ENABLED ) { PerformanceMonitor.start( PerformanceMonitor.TEXT_INDEX_CATEGORY, "Index document" ); } writer.addDocument( assetDocument ); if ( PerformanceMonitor.ENABLED ) { PerformanceMonitor.stop( PerformanceMonitor.TEXT_INDEX_CATEGORY, "Index document" ); } } } catch ( IOException e ) { l4j.error( "Could not add some of " + inserts + " to index", e ); if ( PerformanceMonitor.ENABLED ) { PerformanceMonitor.abort ( PerformanceMonitor.TEXT_INDEX_CATEGORY, "Indexing assets" ); } return; } finally { try { if ( writer != null ) { writer.close(); } } catch ( IOException e ) { l4j.error( "Caught exception closing IndexWriter " + "in finally block", e ); } } } } private void delete( List deletes ) { if ( deletes.isEmpty() ) { return; } IndexReader reader = null; synchronized( lock ) { try { reader = IndexReader.open( INDEX_DIRECTORY ); for ( Iterator iter = deletes.iterator(); iter.hasNext(); ) { IAsset asset = (IAsset) iter.next(); if ( l4j.isDebugEnabled() ) { l4j.debug( "deleting asset " + asset.getId() + " from index" ); } Term assetIdTerm = new Term( Fields.AMS_ASSET_ID, asset.getId().toString() ); if ( PerformanceMonitor.ENABLED ) { PerformanceMonitor.start( PerformanceMonitor.TEXT_INDEX_CATEGORY, "Delete asset" ); } reader.delete( assetIdTerm ); if ( PerformanceMonitor.ENABLED ) { PerformanceMonitor.stop( PerformanceMonitor.TEXT_INDEX_CATEGORY, "Delete asset" ); } } } catch ( IOException e ) { l4j.error( "Could not remove some of assets " + deletes + " from index." , e ); if ( PerformanceMonitor.ENABLED ) { PerformanceMonitor.abort ( PerformanceMonitor.TEXT_INDEX_CATEGORY, "Delete asset" ); } return; } finally { try { if ( reader != null ) { reader.close(); } } catch ( IOException e ) { l4j.error( "Caught exception closing IndexReader " + "in finally block", e ); } } } } /** * Iterates through all the Documents in the index and marks them deleted. **/ public void deleteAll() { synchronized( lock ) { IndexReader reader = null; try { reader = IndexReader.open( INDEX_DIRECTORY ); } catch ( IOException e ) { l4j.error( "Couldn't open index, can't delete all", e ); return; } for ( int docNum = 0; docNum < reader.maxDoc(); docNum++ ) { try { reader.delete( docNum ); } catch ( IOException e ) { l4j.error( "Caught IOException attempting to " + "delete document number " + docNum ); } } try { reader.close(); } catch ( IOException e ) { l4j.error( "Couldn't close indexer", e ); } } } /** * Iterates through the index removing assets which have been * deleted from CM. This method requires a read only CM * transaction. **/ public void pruneIndex() { IAssetRelay assetRelay = RelayFactory.getAssetRelay(); synchronized( lock ) { IndexReader reader = null; try { try { reader = IndexReader.open( INDEX_DIRECTORY ); } catch ( IOException e ) { l4j.error( "Couldn't open index, can't prune it.", e ); return; } for ( int docNum = 0; docNum < reader.maxDoc(); docNum++ ) { if ( ! reader.isDeleted( docNum ) ) { Document doc = reader.document( docNum ); String assetId = doc.get( Fields.AMS_ASSET_ID ); if ( assetId == null ) { reader.delete( docNum ); } IAssetIdentifier identifier = assetRelay.createIdentifier( assetId ); if ( !assetRelay.assetExists( identifier ) ) { reader.delete( docNum ); } } } } catch ( IOException e ) { l4j.error( "IOException pruning index", e ); } finally { try { if ( reader != null ) { reader.close(); } } catch ( IOException e ) { l4j.error( "Caught IOException closing reader " + "in finally block." ); } } } } private Document createDocument( IAsset asset ) { Document assetDocument = new Document(); assetDocument.add( Field.Keyword( Fields.AMS_ASSET_ID, asset.getId().toString() ) ); assetDocument.add ( Field.Keyword( Fields.AMS_ITEM_TYPE, asset.getEntity().getName() ) ); if( l4j.isDebugEnabled() ) { l4j.debug( "indexing " + asset.getId() + " entity = " + asset.getEntity().getName() ); } assetDocument.add( Field.Keyword( Fields.AMS_DATE_INDEXED, new Date() ) ); StringBuffer allContent = new StringBuffer(); for ( Iterator attributeValues = asset.getAttributeValues(); attributeValues.hasNext(); ) { AttributeValue attributeValue = (AttributeValue) attributeValues.next(); String fieldName = attributeValue.getAttribute(). getDatastoreName(); if ( attributeValue.getValue() == null ) { continue; } if ( l4j.isDebugEnabled() ) { l4j.debug( "Indexing '" + fieldName + "' = '" + attributeValue.getValue() + "'" ); } if ( ( attributeValue.getType(). equals( AttributeType.DATE ) || attributeValue.getType(). equals( AttributeType.TIMESTAMP ) ) ) { // bug 16562: until Lucene fixes indexing dates before 1970, // they'll be stored in ISO yyyyMMdd format. String dateString = LUCENE_DATE_FORMAT.format( attributeValue.getDate() ); assetDocument.add ( Field.UnStored( fieldName, dateString ) ); } else if ( attributeValue.getType(). equals( AttributeType.ENUM ) || attributeValue.getAttribute(). isControlledVocabulary() || attributeValue.getAttribute().isTaxonomy() ) { // Enums, controlled vocabularies, and taxonomies need // to be stored verbatium. assetDocument.add ( Field.Keyword ( fieldName, attributeValue.getValue().toString() ) ); } else if ( attributeValue.getAttribute(). getName().equals( "ACLCODE" ) ) { // fix for bug 17105: ACLCODE should not be tokenized. assetDocument.add ( Field.Keyword ( fieldName, attributeValue.getValue().toString() ) ); } else if ( attributeValue.getValue() != null && !attributeValue.getValue().equals( "" ) ) { assetDocument.add ( Field.UnStored ( fieldName, attributeValue.getValue().toString() ) ); } allContent.append( " " ). append( blankNull( attributeValue.getValue() ) ). append( " " ); } if ( AssetUtils.getOriginal( asset ) != null ) { String filename = AssetUtils.getOriginal( asset ).getFilename(); if ( filename != null ) { if ( l4j.isDebugEnabled() ) { l4j.debug( "Indexing '" + Fields.AMS_FILENAME + "' = '" + filename ); } assetDocument.add( Field.Keyword( Fields.AMS_FILENAME, filename ) ); allContent.append( filename ); } } if ( !asset.getVideoAssetView().isEmpty() ) { IVideoAssetView view = asset.getVideoAssetView(); for ( int index = (int) TimeMetadataType.ANNOTATION.getId(); index < TimeMetadataType.ALL.length; index++ ) { IAssetList timeMetadata = view.getTimeMetadata ( TimeMetadataType.ALL[ index ] ); for ( Iterator metadata = timeMetadata.iterator(); metadata.hasNext(); ) { ITimeMetadataAsset timeMetadatum = (ITimeMetadataAsset) metadata.next(); String timeMetaFieldName = Fields.getField( timeMetadatum.getType() ); if ( timeMetaFieldName == null ) { continue; } if ( timeMetadatum.getValue() == null ) { continue; } if ( l4j.isDebugEnabled() ) { l4j.debug( "Indexing '" + timeMetaFieldName + "' = '" + timeMetadatum.getValue() ); } // NOTE: I'm using Field.Text because // Field.UnStored didn't return even the // field NAME with the hits! assetDocument.add ( Field.Text( timeMetaFieldName, timeMetadatum.getValue() ) ); // next, store the asset ID assetDocument.add ( Field.UnIndexed ( timeMetaFieldName + "_ID", timeMetadatum.getId().toString() ) ); allContent.append( " " ). append( timeMetadatum.getValue() ). append( " " ); } } } assetDocument.add( Field.UnStored( Fields.AMS_ALL_CONTENT, allContent.toString() ) ); return assetDocument; } private void initializeIndex() { IndexWriter writer = null; try { // TODO: put in some error checking for indexDirectory if ( !INDEX_DIRECTORY.exists() ) { INDEX_DIRECTORY.mkdirs(); } // create a new index if there's no files in the index dir if ( ! IndexReader.indexExists( INDEX_DIRECTORY ) ) { writer = new IndexWriter( INDEX_DIRECTORY, ANALYZER, true ); } } catch ( IOException e ) { l4j.error( "Couldn't initalize Lucene Index. Indexing will fail.", e ); this.initializationException = e; } finally { try { if ( writer != null ) { writer.close(); } } catch ( IOException e ) { l4j.error( "Caught exception closing IndexWriter " + "in finally block", e ); } } } private String blankNull( Object value ) { return value == null ? "" : value.toString(); } }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]