package org.apache.lucene.index;

/* ====================================================================
 * The Apache Software License, Version 1.1
 *
 * Copyright (c) 2001 The Apache Software Foundation.  All rights
 * reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in
 *    the documentation and/or other materials provided with the
 *    distribution.
 *
 * 3. The end-user documentation included with the redistribution,
 *    if any, must include the following acknowledgment:
 *       "This product includes software developed by the
 *        Apache Software Foundation (http://www.apache.org/)."
 *    Alternately, this acknowledgment may appear in the software itself,
 *    if and wherever such third-party acknowledgments normally appear.
 *
 * 4. The names "Apache" and "Apache Software Foundation" and
 *    "Apache Lucene" must not be used to endorse or promote products
 *    derived from this software without prior written permission. For
 *    written permission, please contact apache@apache.org.
 *
 * 5. Products derived from this software may not be called "Apache",
 *    "Apache Lucene", nor may "Apache" appear in their name, without
 *    prior written permission of the Apache Software Foundation.
 *
 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 */

import java.io.IOException;
import java.io.File;
import java.io.PrintStream;
import java.util.Vector;
import java.util.HashMap;
import java.util.Iterator;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.InputStream;
import org.apache.lucene.store.OutputStream;
import org.apache.lucene.document.Document;
import org.apache.lucene.analysis.Analyzer;

/**
 * IndexWriter2 is a modification of the original IndexWriter, coming
 * with lucene. It benefits from a RAMDirectory, which IndexWriter has
 * as well. The original IndexWriter treats the segments in the RAMDirectory
 * no different from the segments in the target directory, where the index is
 * being built. For example, it ALLWAYS merges RAMDirectory segments in the
 * target directory. Here, we optimize the usage of RAMDirectory in the
 * following way:<br>
 *
 * When a new Document is added, a new segment for it is created in
 * RAMDirectory. When the RAMDirectory collects 'maxDocsInRam' (this is a new
 * important setting, the default is 10000) 1-document
 * segments, IndexWriter2 will merge them into one 10000-documents segment into
 * RAMDirectory (here is a difference from IndexWriter). Then it moves this
 * segment from the RAMDirectory to the target directory (usually a file system
 * directory). This way, during indexing, IndexWriter2 will be writing segments
 * of equal size (equal to maxDocsInRam) to the target directory. In other
 * words, during indexing only one file-system segment is opened and dealt with,
 * which uses just a few file handles. No more "Too many open files"
 * exceptions.<br>
 *
 * After indexing is finished, it is good to call optimize() to merge all
 * created segments into one. The RAMDirectory is out of the picture here and
 * is not being used. Here is where we use the mergeFactor setting:
 * A total of mergeFactor+1 segments will be merged at once into a one new
 * segment. This happens in a loop, until only 1 segment is left.
 * Here you can get  to a "Too many open files" exception, if your mergeFactor
 * is large. If you set mergeFactor to 1, it will merge only 2 segments at a
 * time, which will preserve the file handles, but will be a bit slower than
 * a merge with  mergeFactor=10, for example.<br>
 *
 * At the end of mergeSegments() originally there was a code, where, if a
 * segnment file can't be deleted (because it's currently opened in Windows),
 * it stores it's name in a file, named 'deletable', so that it can try to
 * delete it later. I believe there was some bug with not closing the merged
 * segments properly, which was the reason for all of this. Anyway, now there
 * are no problems with deleting these files on Windows and therefore the code,
 * reading and writing to the 'deletable' file is commented out.<br>
 *
 * @author Ivaylo Zlatev (ivaylo_zlatev@yahoo.com)
 */

public final class IndexWriter2 {

  private       Directory directory;			  // where this index will finally be built
  private final Directory ramDirectory = new RAMDirectory(); // for temporary storing and merging of segments

  private SegmentInfos segmentInfos    = new SegmentInfos(); // the target index segments
  private SegmentInfos ramSegmentInfos = new SegmentInfos(); // the ram segments

  private DocumentWriter ramDocWriter;
  private Analyzer analyzer;			  // how to analyze text

  private Lock writeLock;

  public int maxFieldLength = 10000;

  /** Constructs an IndexWriter for the index in <code>path</code>.  Text will
    be analyzed with <code>a</code>.  If <code>create</code> is true, then a
    new, empty index will be created in <code>path</code>, replacing the index
    already there, if any. */
  public IndexWriter2(String path, Analyzer a, boolean create)
       throws IOException {
    this(FSDirectory.getDirectory(path, create), a, create);
  }

  /** Constructs an IndexWriter for the index in <code>path</code>.  Text will
    be analyzed with <code>a</code>.  If <code>create</code> is true, then a
    new, empty index will be created in <code>path</code>, replacing the index
    already there, if any. */
  public IndexWriter2(File path, Analyzer a, boolean create)
       throws IOException {
    this(FSDirectory.getDirectory(path, create), a, create);
  }

  /** Constructs an IndexWriter for the index in <code>d</code>.  Text will be
    analyzed with <code>a</code>.  If <code>create</code> is true, then a new,
    empty index will be created in <code>d</code>, replacing the index already
    there, if any. */
  public IndexWriter2(Directory d, Analyzer a, final boolean create)
       throws IOException {
    directory = d;
    analyzer = a;
    ramDocWriter = new DocumentWriter(ramDirectory, analyzer, maxFieldLength);

    Lock writeLock = directory.makeLock("write.lock");
    if (!writeLock.obtain())                      // obtain write lock
      throw new IOException("Index locked for write: " + writeLock);
    this.writeLock = writeLock;                   // save it

    synchronized (directory) {			  // in- & inter-process sync
      new Lock.With(directory.makeLock("commit.lock")) {
      public Object doBody() throws IOException {
        if (create)
          segmentInfos.write(directory);
        else
          segmentInfos.read(directory);
        return null;
      }
    }.run();
    }
  }

  /** Flushes all changes to an index, closes all associated files, and closes
    the directory that the index is stored in. */
  public final synchronized void close() throws IOException {
    flushRamSegments();
    ramDirectory.close();
    writeLock.release();                          // release write lock
    writeLock = null;
    directory.close();
  }

  /** Release the write lock, if needed. */
  protected final void finalize() throws IOException {
    if (writeLock != null) {
      writeLock.release();                        // release write lock
      writeLock = null;
    }
  }

  /** Returns the number of documents currently in this index. */
  public final synchronized int docCount() {
    int count = 0;
    for (int i = 0; i < segmentInfos.size(); i++) {
      SegmentInfo si = segmentInfos.info(i);
      count += si.docCount;
    }

    for (int i = 0; i < ramSegmentInfos.size(); i++) {
      SegmentInfo si = segmentInfos.info(i);
      count += si.docCount;
    }

    return count;
  }

  public int maxDocsInRam = 2000;

  /** Adds a document to this index.*/
  public final void addDocument(Document doc) throws IOException {
    String segmentName = newSegmentName(ramSegmentInfos);
    ramDocWriter.addDocument(segmentName, doc);
    synchronized (this) {
      ramSegmentInfos.addElement(new SegmentInfo(segmentName, 1, ramDirectory));
      if(ramSegmentInfos.size()>=maxDocsInRam)
        flushRamSegments();
    }
  }

  private final synchronized String newSegmentName(SegmentInfos sis) {
    return "_" + Integer.toString(sis.counter++, Character.MAX_RADIX);
  }

  /** If non-null, information about merges will be printed to this. */
  public PrintStream infoStream = null;

  public int mergeFactor = 10;

  /** Merges all segments together into a single segment, optimizing an index
      for search. */
  public final synchronized void optimize() throws IOException {
    flushRamSegments();

    while (segmentInfos.size() > 1 || (segmentInfos.size() == 1 && (SegmentReader.hasDeletions(segmentInfos.info(0)) || segmentInfos.info(0).dir != directory))) {
      int minSegment = segmentInfos.size() - mergeFactor;
      mergeSegments(segmentInfos, (minSegment < 0 ? 0 : minSegment), segmentInfos, directory);
    }
  }

  /** Merges all segments from an array of indexes into this index.
   *
   * <p>This may be used to parallelize batch indexing.  A large document
   * collection can be broken into sub-collections.  Each sub-collection can be
   * indexed in parallel, on a different thread, process or machine.  The
   * complete index can then be created by merging sub-collection indexes
   * with this method.
   *
   * <p>After this completes, the index is optimized. */
  public final synchronized void addIndexes(Directory[] dirs) throws IOException {
    optimize();					  // start with zero or 1 seg
    for (int i = 0; i < dirs.length; i++) {
      SegmentInfos sis = new SegmentInfos();	  // read infos from dir
      sis.read(dirs[i]);
      for (int j = 0; j < sis.size(); j++) {
        segmentInfos.addElement(sis.info(j));	  // add each info
      }
    }
    optimize();					  // final cleanup
  }

  /** Merges all RAM-resident segments into one RAM segment and then writes
   *  it into the directory (where the index is being built).
   **/
  private final void flushRamSegments() throws IOException {
    if(ramSegmentInfos.size()==0)
        return; //nothing to do.

    if(ramSegmentInfos.size()>1) //make one segment in the ramDirectory
        mergeSegments(ramSegmentInfos, 0, ramSegmentInfos, ramDirectory);

    //merge the one ram segment into the directory:
    mergeSegments(ramSegmentInfos, 0, segmentInfos, directory);
  }


  /** Pops segments off of sourceSegmentInfos stack down to minSegment, merges them,
    and pushes the merged index onto the top of the segmentInfos stack. */
  private final void mergeSegments(SegmentInfos sourceSegmentInfos, int minSegment, final SegmentInfos targetSegmentInfos, final Directory targetDirectory)
      throws IOException {
    String mergedName = newSegmentName(targetSegmentInfos);
    int mergedDocCount = 0;
    if (infoStream != null) infoStream.print("merging segments");
    SegmentMerger merger = new SegmentMerger(targetDirectory, mergedName);

    final HashMap segmentsToDelete = new HashMap(); //maps (SegmentInfo) segmentInfo -> (Vector) names of files, comprising this segment

    for (int i = minSegment; i < sourceSegmentInfos.size(); i++) {
      SegmentInfo si = sourceSegmentInfos.info(i);
      if (infoStream != null)
        infoStream.print(" " + si.name + " (" + si.docCount + " docs)");
      SegmentReader reader = new SegmentReader(si);
      merger.add(reader);
      segmentsToDelete.put(si, reader.files());
      mergedDocCount += si.docCount;
    }

    if (infoStream != null)
      infoStream.println("\n into "+mergedName+" ("+mergedDocCount+" docs)");

    merger.merge(); //it will close the opened SegmentReader-s at the end.

    sourceSegmentInfos.setSize(minSegment);		  // pop old infos & add new
    targetSegmentInfos.addElement(new SegmentInfo(mergedName, mergedDocCount, targetDirectory));

    synchronized (targetDirectory) {			  // in- & inter-process sync
      new Lock.With(directory.makeLock("commit.lock")) {
        public Object doBody() throws IOException {
          targetSegmentInfos.write(targetDirectory);	  // commit before deleting

          Vector deletable = new Vector(); //files, which can't be deleted right now will be stored here and saved to a file, named 'deletable'

          for(java.util.Iterator iter=segmentsToDelete.keySet().iterator(); iter.hasNext(); ) {
            SegmentInfo si = (SegmentInfo) iter.next();
            Vector filesToDelete = (Vector) segmentsToDelete.get(si);
            deleteFiles(filesToDelete, deletable, si.dir); // delete now-unused segments
          }

          if (infoStream != null)
            infoStream.println("The following files could not be deleted : "+deletable);
          //deleteFiles(readDeleteableFiles(), deletable, directory); // try to delete the files. listed in file 'deleteable'
          //writeDeleteableFiles(deletable);		  // note files we can't delete

          return null;
        }
      }.run();
    }

  }

  private final void deleteFiles(Vector files, Vector deletable, Directory d)
       throws IOException {
    for (int i = 0; i < files.size(); i++) {
      String file = (String)files.elementAt(i);
      try {
        d.deleteFile(file);		  // try to delete each file
      } catch (IOException e) {			  // if delete fails
        if (directory.fileExists(file)) {
          deletable.addElement(file);		  // add to deletable
        }
      }
    }
  }

  private final Vector readDeleteableFiles() throws IOException {
    Vector result = new Vector();
    if (!directory.fileExists("deletable"))
      return result;

    InputStream input = directory.openFile("deletable");
    try {
      for (int i = input.readInt(); i > 0; i--)	  // read file names
    result.addElement(input.readString());
    } finally {
      input.close();
    }
    return result;
  }

  private final void writeDeleteableFiles(Vector files) throws IOException {
    OutputStream output = directory.createFile("deleteable.new");
    try {
      output.writeInt(files.size());
      for (int i = 0; i < files.size(); i++)
    output.writeString((String)files.elementAt(i));
    } finally {
      output.close();
    }
    directory.renameFile("deleteable.new", "deletable");
  }

}
