We are having a problem with Lucene in a high concurrency
create/delete/search situation. I thought I fixed all these problems,
but I guess not.

Here's what's happening.

We are conducting load testing on our application.

On a Windows 2000 server using lucene-1.3-final with compound file
enabled, a worker thread is creating new Documents as it ingests
content. Meanwhile, a test script is going that is hitting the search
part of our application (I think the script also updates and deletes
Documents, but I am not sure. My colleague who wrote it has left for the
day so I can't ask him.).

The scripted test passes with 1, 5, and 10 users hitting the
application. At 20 users, we get this exception:

[Task Worker1] ERROR com.ancept.ams.search.lucene.LuceneIndexer  -
Caught exception closing IndexReader in finally block
java.io.IOException: couldn't delete segments
        at
org.apache.lucene.store.FSDirectory.renameFile(FSDirectory.java:236)
        at
org.apache.lucene.index.SegmentInfos.write(SegmentInfos.java(Compiled
 Code))
        at
org.apache.lucene.index.SegmentReader$1.doBody(SegmentReader.java:179
)
        at org.apache.lucene.store.Lock$With.run(Lock.java:148)
        at
org.apache.lucene.index.SegmentReader.doClose(SegmentReader.java(Comp
iled Code))
        at
org.apache.lucene.index.IndexReader.close(IndexReader.java(Inlined Co
mpiled Code))
        at
org.apache.lucene.index.SegmentsReader.doClose(SegmentsReader.java(Co
mpiled Code))
        at
org.apache.lucene.index.IndexReader.close(IndexReader.java(Compiled C
ode))
        at
com.ancept.ams.search.lucene.LuceneIndexer.delete(LuceneIndexer.java:
266)

All write access to the index is controlled in that LuceneIndexer class
by synchronizing on a static lock object. 

Searching is handled in another part of the code, which creates new
IndexSearchers as necessary when the index changes. I do not rely on
finalization to clean up these searchers because we found it to be
unreliable. I keep track of threads using each searcher and then close
it when that number drops to 0 if the searcher is outdated. 

My problem seems similar to what Robert Leftwich asked about on this
mailing list in January 2001.  

Google Cache:
http://64.233.179.104/search?q=cache:1D4h1vSh5AQJ:www.geocrawler.com/mail/msg.php3%3Fmsg_id%3D5020057++lucene+multithreading+problems+site:geocrawler.com&hl=en

Doug Cutting replied to him saying that he should synchronize calls to
IndexReader.open() and IndexReader.close():

Google Cache:
http://64.233.179.104/search?q=cache:arztiytQ42QJ:www.geocrawler.com/archives/3/2624/2001/1/0/5020870/++lucene+multithreading+problems+site:geocrawler.com&hl=en

Robert Leftwich then found a problem with his code and eliminated a
second IndexReader that was messing stuff up:

Google Cache:
http://64.233.179.104/search?q=cache:jSIsi6t9KH8J:www.geocrawler.com/mail/msg.php3%3Fmsg_id%3D5037517++lucene+multithreading+problems+site:geocrawler.com&hl=en

However, there are differences between Leftwich's design and mine, and
besides, that thread is four years old. (Are there even exisiting
archives for lucene-user throughout 2001 anywhere?)

So any advice would be appreciated.

Do I need to synchronize _all_ IndexReader.open() and
IndexReader.close() calls? Or is it more likely that I'm missing
something in my class that modifies the index? The code is attached.

Thank you,

Luke Francl
// $Id: LuceneIndexer.java 20473 2004-10-19 17:20:10Z lfrancl $
package com.ancept.ams.search.lucene;

import com.ancept.ams.asset.AssetUtils;
import com.ancept.ams.asset.AttributeValue;
import com.ancept.ams.asset.IAsset;
import com.ancept.ams.asset.IAssetIdentifier;
import com.ancept.ams.asset.IAssetList;
import com.ancept.ams.asset.ITimeMetadataAsset;
import com.ancept.ams.asset.IVideoAssetView;
import com.ancept.ams.controller.RelayFactory;
import com.ancept.ams.enums.AttributeNamespace;
import com.ancept.ams.enums.AttributeType;
import com.ancept.ams.enums.TimeMetadataType;
import com.ancept.ams.relay.IAssetRelay;
import com.ancept.ams.search.Indexer;
import com.ancept.ams.search.Fields;
import com.ancept.ams.util.SystemConfig;
import com.ancept.ams.util.PerformanceMonitor;

import org.apache.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.snowball.SnowballAnalyzer;
import org.apache.lucene.analysis.StopAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;

import java.io.File;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;

/**
 * Controls access to the Lucene index.
 *
 * @author Luke Francl
 **/
public final class LuceneIndexer implements Indexer {

    private static final Logger l4j = Logger.getLogger( LuceneIndexer.class );

    static File INDEX_DIRECTORY =
        new File( SystemConfig.getProperty( "LuceneIndexDir", "./index" ) );

    /**
     * For parsing Queries, you must use the same analyser used to
     * index data.
     **/
    static Analyzer ANALYZER =
        new SnowballAnalyzer( "English",
                              StopAnalyzer.ENGLISH_STOP_WORDS );

    private static LuceneIndexer instance = new LuceneIndexer();

    /** ISO format without dashes for a more compact representation. **/
    static final DateFormat LUCENE_DATE_FORMAT =
        new SimpleDateFormat( "yyyyMMdd" );

    /**
     * This object is used to synchronize index write operations.
     *
     * AMS uses Lucene with incremental updates to the index each time
     * an asset is changed.
     *
     * Since changes cannot be flushed to the index without closing
     * the IndexWriter, each method in this class creates a new IndexWriter
     * (or, in the case of delete, IndexReader). Because two threads cannot
     * write to the same index at the same time, this locking is necessary.
     **/
    private static final Object lock = new Object();

    /** The initializeIndex will set this if there is an error. **/
    private static Exception initializationException = null;

    private LuceneIndexer() {
        initializeIndex();
    }

    public static LuceneIndexer getInstance() {

        if ( initializationException != null ) {
            throw new RuntimeException
                ( "Initialization of Lucene Indexer failed. " +
                  "Exception was:" + initializationException );
        }

        return instance;
    }

    void optimize() {

        synchronized( lock ) {

            IndexWriter writer = null;

            try {
                writer = new IndexWriter( INDEX_DIRECTORY,
                                          ANALYZER,
                                          false );
                // Reduces number of files required for an index.
                // See bug 16527
                writer.setUseCompoundFile( true );
                writer.optimize();

            } catch ( IOException e ) {
                l4j.error( "Caught IOException trying to optimize index", e );
            } finally {

                try {
                    if ( writer != null ) {
                        writer.close();
                    }
                } catch ( IOException e ) {
                    l4j.error( "Exception closing IndexWriter " +
                               "in finally block", e );

                }
            }
        }
    }

    public void index( List deletes, List inserts ) {
            delete( deletes );
            index( inserts );
    }

    private void index( List inserts ) {
        if ( inserts.isEmpty() ) {
            return;
        }


        List documents = new ArrayList();

        for ( Iterator iter = inserts.iterator(); iter.hasNext(); ) {
            if ( PerformanceMonitor.ENABLED ) {
                PerformanceMonitor.start( PerformanceMonitor.TEXT_INDEX_CATEGORY,
                                          "Create index document" );
            }
            documents.add( createDocument( (IAsset) iter.next() ) );
            if ( PerformanceMonitor.ENABLED ) {
                PerformanceMonitor.stop( PerformanceMonitor.TEXT_INDEX_CATEGORY,
                                          "Create index document" );
            }
        }


        IndexWriter writer = null;

        // Basted on a mis-read of http://www.onjava.com/lpt/a/3273 I
        // had this synchronization only around the addDocument
        // method. However, that only works for multiple threads
        // sharing the same IndexWriter, not "index" as indicated by
        // the article.
        synchronized( lock ) {

            try {
                writer = new IndexWriter( INDEX_DIRECTORY,
                                          ANALYZER,
                                          false );

                // Reduces number of files required for an index.
                // See bug 16527
                writer.setUseCompoundFile( true );


                for ( Iterator docs = documents.iterator(); docs.hasNext(); ) {

                    Document assetDocument = (Document) docs.next();

                    if ( PerformanceMonitor.ENABLED ) {
                        PerformanceMonitor.start( PerformanceMonitor.TEXT_INDEX_CATEGORY,
                                                  "Index document" );
                    }
                    writer.addDocument( assetDocument );
                    if ( PerformanceMonitor.ENABLED ) {
                        PerformanceMonitor.stop( PerformanceMonitor.TEXT_INDEX_CATEGORY,
                                                 "Index document" );
                    }
                }

            } catch ( IOException e ) {
                l4j.error( "Could not add some of  " +
                           inserts + " to index", e );

                if ( PerformanceMonitor.ENABLED ) {
                    PerformanceMonitor.abort
                        ( PerformanceMonitor.TEXT_INDEX_CATEGORY,
                          "Indexing assets" );
                }

                return;
            } finally {

                try {
                    if ( writer != null ) {
                        writer.close();
                    }
                } catch ( IOException e ) {
                    l4j.error( "Caught exception closing IndexWriter " +
                               "in finally block", e );
                }
            }
        }
    }

    private void delete( List deletes ) {

        if ( deletes.isEmpty() ) {
            return;
        }

        IndexReader reader = null;

        synchronized( lock ) {
            try {

                reader = IndexReader.open( INDEX_DIRECTORY );

                for ( Iterator iter = deletes.iterator(); iter.hasNext(); ) {

                    IAsset asset = (IAsset) iter.next();

                    if ( l4j.isDebugEnabled() ) {
                        l4j.debug( "deleting asset " + asset.getId() +
                                   " from index" );
                    }

                    Term assetIdTerm = new Term( Fields.AMS_ASSET_ID,
                                                 asset.getId().toString() );

                    if ( PerformanceMonitor.ENABLED ) {
                        PerformanceMonitor.start( PerformanceMonitor.TEXT_INDEX_CATEGORY,
                                                  "Delete asset" );
                    }
                    reader.delete( assetIdTerm );
                    if ( PerformanceMonitor.ENABLED ) {
                        PerformanceMonitor.stop( PerformanceMonitor.TEXT_INDEX_CATEGORY,
                                                 "Delete asset" );
                    }
                }

            } catch ( IOException e ) {
                l4j.error( "Could not remove some of assets " +
                           deletes + " from index." , e );

                if ( PerformanceMonitor.ENABLED ) {
                    PerformanceMonitor.abort
                        ( PerformanceMonitor.TEXT_INDEX_CATEGORY,
                          "Delete asset" );

                }

                return;
            } finally {

                try {
                    if ( reader != null ) {
                        reader.close();
                    }
                } catch ( IOException e ) {
                    l4j.error( "Caught exception closing IndexReader " +
                               "in finally block", e );
                }
            }

        }

    }


    /**
     * Iterates through all the Documents in the index and marks them deleted.
     **/
    public void deleteAll() {

        synchronized( lock ) {

            IndexReader reader = null;

            try {
                reader = IndexReader.open( INDEX_DIRECTORY );
            } catch ( IOException e ) {
                l4j.error( "Couldn't open index, can't delete all", e );

                return;
            }

            for ( int docNum = 0; docNum < reader.maxDoc(); docNum++ ) {

                try {

                    reader.delete( docNum );

                } catch ( IOException e ) {
                    l4j.error( "Caught IOException attempting to " +
                               "delete document number " + docNum );
                }
            }

            try {
                reader.close();
            } catch ( IOException e ) {
                l4j.error( "Couldn't close indexer", e );
            }
        }
    }

    /**
     * Iterates through the index removing assets which have been
     * deleted from CM. This method requires a read only CM
     * transaction.
     **/
    public void pruneIndex() {

        IAssetRelay assetRelay = RelayFactory.getAssetRelay();

        synchronized( lock ) {
            IndexReader reader = null;

            try {
                try {
                    reader = IndexReader.open( INDEX_DIRECTORY );
                } catch ( IOException e ) {
                    l4j.error( "Couldn't open index, can't prune it.", e );

                    return;
                }

                for ( int docNum = 0; docNum < reader.maxDoc(); docNum++ ) {

                    if ( ! reader.isDeleted( docNum ) ) {

                        Document doc = reader.document( docNum );

                        String assetId = doc.get( Fields.AMS_ASSET_ID );

                        if ( assetId == null ) {
                            reader.delete( docNum );
                        }

                        IAssetIdentifier identifier =
                            assetRelay.createIdentifier( assetId );

                        if ( !assetRelay.assetExists( identifier ) ) {
                            reader.delete( docNum );
                        }
                    }
                }
            } catch ( IOException e ) {
                l4j.error( "IOException pruning index", e );
            } finally {
                try {
                    if ( reader != null ) {
                        reader.close();
                    }
                } catch ( IOException e ) {
                    l4j.error( "Caught IOException closing reader " +
                               "in finally block." );
                }
            }
        }

    }

    private Document createDocument( IAsset asset ) {
        Document assetDocument = new Document();

        assetDocument.add( Field.Keyword( Fields.AMS_ASSET_ID,
                                          asset.getId().toString() ) );

        assetDocument.add
            ( Field.Keyword( Fields.AMS_ITEM_TYPE,
                             asset.getEntity().getName() ) );

        if( l4j.isDebugEnabled() ) {
            l4j.debug( "indexing " + asset.getId() +
                       " entity =  " + asset.getEntity().getName() );
        }

        assetDocument.add( Field.Keyword( Fields.AMS_DATE_INDEXED,
                                          new Date() ) );

        StringBuffer allContent = new StringBuffer();

        for ( Iterator attributeValues =
                  asset.getAttributeValues();
              attributeValues.hasNext(); ) {


            AttributeValue attributeValue =
                (AttributeValue) attributeValues.next();

            String fieldName = attributeValue.getAttribute().
                getDatastoreName();

            if ( attributeValue.getValue() == null ) {
                continue;
            }

            if ( l4j.isDebugEnabled() ) {
                l4j.debug( "Indexing '" + fieldName + "' = '" +
                           attributeValue.getValue() + "'" );
            }

            if ( ( attributeValue.getType().
                   equals( AttributeType.DATE ) ||
                       attributeValue.getType().
                   equals( AttributeType.TIMESTAMP ) ) ) {

                // bug 16562: until Lucene fixes indexing dates before 1970,
                // they'll be stored in ISO yyyyMMdd format.

                String dateString =
                    LUCENE_DATE_FORMAT.format( attributeValue.getDate() );

                assetDocument.add
                    ( Field.UnStored( fieldName,
                                      dateString ) );

            } else if ( attributeValue.getType().
                        equals( AttributeType.ENUM ) ||
                        attributeValue.getAttribute().
                        isControlledVocabulary() ||
                        attributeValue.getAttribute().isTaxonomy() ) {

                // Enums, controlled vocabularies, and taxonomies need
                // to be stored verbatium.

                assetDocument.add
                    ( Field.Keyword
                      ( fieldName,
                        attributeValue.getValue().toString() ) );

            } else if ( attributeValue.getAttribute().
                        getName().equals( "ACLCODE" ) ) {

                // fix for bug 17105: ACLCODE should not be tokenized.

                assetDocument.add
                    ( Field.Keyword
                      ( fieldName,
                        attributeValue.getValue().toString() ) );

            } else if ( attributeValue.getValue() != null &&
                        !attributeValue.getValue().equals( "" ) ) {

                assetDocument.add
                    ( Field.UnStored
                      ( fieldName,
                        attributeValue.getValue().toString() ) );
            }

            allContent.append( " " ).
                append( blankNull( attributeValue.getValue() ) ).
                append( " " );
            }

        if ( AssetUtils.getOriginal( asset ) != null ) {

            String filename =
                AssetUtils.getOriginal( asset ).getFilename();

            if ( filename != null ) {

                if ( l4j.isDebugEnabled() ) {
                    l4j.debug( "Indexing '" + Fields.AMS_FILENAME +
                               "' = '" + filename );
                }

                assetDocument.add( Field.Keyword( Fields.AMS_FILENAME,
                                                  filename ) );

                allContent.append( filename );
            }
        }

        if ( !asset.getVideoAssetView().isEmpty() ) {

            IVideoAssetView view = asset.getVideoAssetView();

            for ( int index = (int) TimeMetadataType.ANNOTATION.getId();
                  index < TimeMetadataType.ALL.length; index++ ) {

                IAssetList timeMetadata =  view.getTimeMetadata
                    ( TimeMetadataType.ALL[ index ] );

                    for ( Iterator metadata = timeMetadata.iterator();
                          metadata.hasNext(); ) {

                        ITimeMetadataAsset timeMetadatum =
                            (ITimeMetadataAsset) metadata.next();

                        String timeMetaFieldName =
                            Fields.getField( timeMetadatum.getType() );

                        if ( timeMetaFieldName == null ) {
                            continue;
                        }

                        if ( timeMetadatum.getValue() == null ) {
                            continue;
                        }

                        if ( l4j.isDebugEnabled() ) {
                            l4j.debug( "Indexing '" + timeMetaFieldName +
                                       "' = '" + timeMetadatum.getValue() );
                        }

                        // NOTE: I'm using Field.Text because
                        // Field.UnStored didn't return even the
                        // field NAME with the hits!
                        assetDocument.add
                            ( Field.Text( timeMetaFieldName,
                                          timeMetadatum.getValue() ) );

                        // next, store the asset ID
                        assetDocument.add
                            ( Field.UnIndexed
                              ( timeMetaFieldName + "_ID",
                                timeMetadatum.getId().toString() ) );


                        allContent.append( " " ).
                            append( timeMetadatum.getValue() ).
                            append( " " );
                    }

                }
            }

            assetDocument.add( Field.UnStored( Fields.AMS_ALL_CONTENT,
                                               allContent.toString() ) );

            return assetDocument;

    }

    private void initializeIndex() {

        IndexWriter writer = null;

        try {
            // TODO: put in some error checking for indexDirectory

            if ( !INDEX_DIRECTORY.exists() ) {
                INDEX_DIRECTORY.mkdirs();
            }

            // create a new index if there's no files in the index dir
            if ( ! IndexReader.indexExists( INDEX_DIRECTORY ) ) {
                writer = new IndexWriter( INDEX_DIRECTORY,
                                          ANALYZER,
                                          true );
            }

        } catch ( IOException e ) {
            l4j.error( "Couldn't initalize Lucene Index. Indexing will fail.",
                       e );

            this.initializationException = e;
        } finally {

            try {
                if ( writer != null ) {
                    writer.close();
                }
            } catch ( IOException e ) {
                l4j.error( "Caught exception closing IndexWriter " +
                           "in finally block", e );
            }
        }
    }

    private String blankNull( Object value ) {
        return value == null ? "" : value.toString();
    }
}


---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to