/**
 * @author Marc Kramis (mkramis@student.ethz.ch)
 * @version 1.0.43
 *
 * requires Java 1.2 API or higher
 * -----------------------------------------------------------------------------
 * SQLDirectory as persistence layer based on a RDBMS. required are two
 * tables and one combined index:
 * table LUCENE_FILE:   primary key filename varchar(256)
 *                      modified integer(8)
 *                      length integer(8)
 *                      reader integer (8)
 * table LUCENE_BUFFER: filename varchar(256)
 *                      buffernumber integer(8)
 *                      buffer binary(1024)
 * index LUCENE_INDEX:  on LUCENE_BUFFER (filename, buffernumber)
 *
 * note: if more than one index is required in one tablespace, use other names.
 * note: remember to use a high merge factor (e.g. 100) for batch indexing.
 * -----------------------------------------------------------------------------
 * priorities: 1=high, 2=medium, 3=low
 * @todo 2:exception handling, throw IOException instead of IllegalStateExcp.
 * @todo 3:since SQLDirectory is slower than FSDirectory, the current locking
 *         mechanism might cause timeouts more often. check for random exponential
 *         timeout (see TCP/IP) instead of linear or other locking mechanism.
 * @todo 2:how to handle broken connections?
 * @todo 2:open reader problem: if reader fails, reader is not decremented
 * @todo 2:in-process thread safety?
 * @todo 3:test for several drivers and db systems
 */

import org.apache.lucene.store.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Hashtable;
import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.Statement;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

final public class SQLDirectory extends Directory {

  private final static String DEFAULT_FILE_TABLE = "LUCENE_FILE";
  private final static String DEFAULT_BUFFER_TABLE = "LUCENE_BUFFER";

  private final Connection con;
  private final String fileTable;
  private final String bufferTable;

  /**
   * read-only processes might use much higher sizes
   * minimum is 2
   */
  public int cacheSize = 32;

  /**
   * accept "outside" connection for connection reusing.
   */
  public SQLDirectory(Connection con) {
    // assign parameter
    this.con = con;
    this.fileTable = DEFAULT_FILE_TABLE;
    this.bufferTable = DEFAULT_BUFFER_TABLE;
    // initialization
    try {
      con.setAutoCommit(false);
    } catch (SQLException se) {
      throw new IllegalArgumentException(se.getMessage());
    }
  }

  /**
   * Convenience Constructor
   * using default table name. create is set to false (only read or append)
   */
  public SQLDirectory(String driver,
                      String url,
                      String user,
                      String login) {
    this(driver, url, user, login, DEFAULT_FILE_TABLE, DEFAULT_BUFFER_TABLE, false);
  }

  /**
   * important for transactionla behaviour: autocommit set to false!
   * this feature lets us profit from db transactions. (except mySQL?)
   * (no parameter check is performed)
   */
  public SQLDirectory(String driver,
                      String url,
                      String user,
                      String login,
                      String fileTable,
                      String bufferTable,
                      boolean create)  {
    // assign parameter
    this.fileTable = fileTable;
    this.bufferTable = bufferTable;
    // initialization
    try {
      Class.forName(driver);
      con = DriverManager.getConnection(url, user, login);
      con.setAutoCommit(false);
      if (create) {
        create();
      }
    } catch (ClassNotFoundException ce) {
      throw new IllegalArgumentException("invalid driver: " + ce.getMessage());
    } catch (SQLException se) {
      throw new IllegalArgumentException("invalid url, user or login: " + se.getMessage());
    }
  }

  /**
   * Recovery
   * if some writer or reader process has failed. make sure no other reader or
   * writer threads exist. then recover by deleting write.lock, commit.lock and
   */
  public final void recover() throws IOException {
    try {
      //System.out.print("create() ...");
      // clear all reader
      Statement stmt = con.createStatement();
      stmt.executeUpdate("update " + fileTable + " set reader = 0");
      stmt.executeUpdate("delete from " + fileTable + " where filename like '%.lock'");
      stmt.executeUpdate("delete from " + bufferTable + " where filename like '%.lock'");
      stmt.close();
      con.commit();
      //System.out.println(" done");
    } catch (SQLException e) {
      try {
        con.rollback();
      } catch (SQLException se) {
        throw new IOException("rollback failed: " + se.getMessage());
      }
      throw new IOException(e.getMessage());
    }
  }

  /**
   * delete all files and buffers
   */
  private final void create() throws SQLException {
    try {
      //System.out.print("create() ...");
      // delete all existing files and buffers
      Statement stmt = con.createStatement();
      stmt.executeUpdate("delete from " + bufferTable);
      stmt.executeUpdate("delete from " + fileTable);
      stmt.close();
      con.commit();
      //System.out.println(" done");
    } catch (SQLException e) {
      try {
        con.rollback();
      } catch (SQLException se) {
        throw new SQLException("rollback failed: " + se.getMessage());
      }
      throw new SQLException(e.getMessage());
    }
  }

  /** Returns an array of strings, one for each file in the directory. */
  public final String[] list() throws IOException {
    //System.out.print("list() ...");
    String[] result = null;
    try {
      ArrayList list = new ArrayList();
      Statement stmt = con.createStatement();
      ResultSet rs = stmt.executeQuery("select filename from " + fileTable);
      while (rs.next()) {
        list.add(rs.getString(1));
      }
      rs.close();
      stmt.close();
      result = (String[])list.toArray();
    } catch (SQLException e) {
      throw new IOException(e.getMessage());
    }
    //System.out.println(" done");
    return result;
  }

  /** Returns true iff the named file exists in this directory. */
  public final boolean fileExists(String name) throws IOException {
    //System.out.print("fileExists(" + name + ") ...");
    boolean result;
    try {
      Statement stmt = con.createStatement();
      ResultSet rs = stmt.executeQuery("select filename from "
          + fileTable + " where filename = '" + name + "'");
      result = rs.next();
      rs.close();
      stmt.close();
    } catch (SQLException e) {
      throw new IOException(e.getMessage());
    }
    //System.out.println(" done: " + result);
    return result;
  }

  /** Returns the time the named file was last modified. */
  public final long fileModified(String name) throws IOException {
    //System.out.print("fileModified(" + name + ") ...");
    long result;
    try {
      Statement stmt = con.createStatement();
      ResultSet rs = stmt.executeQuery("select modified from "
          + fileTable + " where filename = '" + name + "'");
      rs.next();
      result = rs.getLong(1);
      rs.close();
      stmt.close();
    } catch (SQLException e) {
      throw new IOException(e.getMessage());
    }
    //System.out.println(" done:  " + result);
    return result;
  }

  /** Returns the length in bytes of a file in the directory. */
  public final long fileLength(String name) throws IOException {
    //System.out.print("fileLength(" + name + ") ...");
    long result;
    try {
      Statement stmt = con.createStatement();
      ResultSet rs = stmt.executeQuery("select length from "
          + fileTable + " where filename = '" + name + "'");
      rs.next();
      result = rs.getLong(1);
      rs.close();
      stmt.close();
    } catch (SQLException e) {
      throw new IOException(e.getMessage());
    }
    //System.out.println(" done: " + result);
    return result;
  }

  /** Removes an existing file in the directory. */
  public final void deleteFile(String name) throws IOException {
    //System.out.print("deleteFile(" + name + ") ...");
    try {
      Statement stmt = con.createStatement();
      int deleted = stmt.executeUpdate("delete from " + fileTable + " where filename = '"
          + name + "'");
      if (deleted != 0) {
        stmt.executeUpdate("delete from " + bufferTable + " where filename = '"
            + name + "'");
        stmt.close();
        con.commit();
      } else {
        stmt.close();
        throw new IOException("couldn't delete" + name);
      }
    } catch (SQLException e) {
      try {
        con.rollback();
      } catch (SQLException se) {
        throw new IOException("rollback failed: " + se.getMessage());
      }
      throw new IOException(e.getMessage());
    }
    //System.out.println(" done");
  }

  /** Removes an existing file in the directory. */
  public final void renameFile(String from, String to) throws IOException {
    //System.out.print("renameFile(" + from + ", " + to + ") ...");
    try {
      Statement stmt = con.createStatement();
      stmt.executeUpdate("delete from " + fileTable + " where filename = '"
          + to + "'");
      stmt.executeUpdate("delete from " + bufferTable + " where filename = '"
          + to + "'");
      stmt.executeUpdate("update " + fileTable + " set filename = '"
          + to + "' where filename = '"
          + from + "'");
      stmt.executeUpdate("update " + bufferTable + " set filename = '"
          + to + "' where filename = '"
          + from + "'");
      stmt.close();
      con.commit();
    } catch (SQLException e) {
      try {
        con.rollback();
      } catch (SQLException se) {
        throw new IOException("rollback failed: " + se.getMessage());
      }
      throw new IOException(e.getMessage());
    }
    //System.out.println(" done");
  }

  /** Creates a new, empty file in the directory with the given name.
      Returns a stream writing this file. */
  public final OutputStream createFile(String name) throws IOException {
    //System.out.print("createFile(" + name + ") ...");
    try {
      Statement stmt = con.createStatement();
      stmt.executeUpdate("insert into "
          + fileTable + " (filename, modified, length, reader) values ('"
          + name + "', " + System.currentTimeMillis() + ", 0, 0)");
      stmt.close();
      con.commit();
    } catch (SQLException e) {
      try {
        con.rollback();
      } catch (SQLException se) {
        throw new IOException("rollback failed: " + se.getMessage());
      }
      throw new IOException(e.getMessage());
    }
    //System.out.println(" done");
    return new SQLOutputStream(con, fileTable, bufferTable, name);
  }

  /** Returns a stream reading an existing file. */
  public final InputStream openFile(String name) throws IOException {
    //System.out.println("openFile(" + name + ")");
    return new SQLInputStream(con, fileTable, bufferTable, name, fileLength(name), cacheSize);
  }

  /** Construct a {@link Lock}.
   * @paSql name the name of the lock file
   */
  public final Lock makeLock(final String name) {
    return new Lock() {
	public boolean obtain() throws IOException {
          boolean result = true;
          //System.out.print("lock.obtain(" + name + ") ...");
          try {
            Statement stmt = con.createStatement();
            stmt.executeUpdate("insert into "
                + fileTable + " (filename, modified, length, reader) values ('"
                + name + "', " + System.currentTimeMillis() + ", 0, 0)");
            stmt.close();
            con.commit();
          } catch (SQLException e) {
            try {
              con.rollback();
            } catch (Exception se) {
              throw new IOException("rollback failed: " + se.getMessage());
            }
            result = false;
          }
          //System.out.println(" done: " + result);
          return result;
	}
	public void release() {
          //System.out.print("lock.release(" + name + ") ...");
	  try {
            Statement stmt = con.createStatement();
            stmt.executeUpdate("delete from " + fileTable + " where filename = '"
                + name + "'");
            stmt.close();
            con.commit();
          } catch (SQLException e) {
            try {
              con.rollback();
            } catch (SQLException se) {
              throw new IllegalStateException("rollback failed: " + se.getMessage());
            }
            throw new IllegalStateException(e.getMessage());
          }
          //System.out.println(" done");
	}
      };
  }

  /** Closes the store to future operations. */
  public final void close() {
    //System.out.println("close()");
    try {
      if (con != null && !!con.isClosed()) {
        con.close();
      }
    } catch (Exception e) {
      throw new IllegalStateException(e.getMessage());
    }
  }
}

/**
 * SqlInputStream to read virtual files
 * note: cloneable interface!
 */
final class SQLInputStream extends InputStream implements Cloneable {

  private static final int SQL_BUFFER_SIZE = 1024;

  private final Connection con;
  private final String fileTable;
  private final String bufferTable;
  private final String filename;

  private final SQLCache cache;
  private final StringBuffer sqlQuery;
  private long pointer;

  /**
   * SqlInputStream constructor
   * make sure a cloned instance is well behaving by implementing the clone()
   * method
   */
  public SQLInputStream(Connection con, String fileTable, String bufferTable, String filename, long length, int cacheSize) {
    // assign parameter
    this.con = con;
    this.fileTable = fileTable;
    this.bufferTable = bufferTable;
    this.filename = filename;
    this.length = length;
    // initialization
    cache = new SQLCache(cacheSize);
    sqlQuery = new StringBuffer("select buffer from ");
    sqlQuery.append(bufferTable);
    sqlQuery.append(" where filename = '");
    sqlQuery.append(filename);
    sqlQuery.append("' and buffernumber = ?");
    pointer = 0;
    updateReader('+');
    //System.out.println("SqlInputStream(" + filename + ")");
  }

  /**
   * make sure, each clone reading increases the reader counter
   */
  public Object clone() {
    //System.out.println("SqlInputStream.clone(" + filename + ")");
    updateReader('+');
    return super.clone();
  }

  /**
   * InputStream methods
   **/
  public final void readInternal(byte[] dest, int destOffset, int len) {
    //System.out.print("SqlInputStream.readInternal(" + filename + ") ...");
    try {

      PreparedStatement pstmtSelect = con.prepareStatement(sqlQuery.toString());

      ResultSet rs = null;
      byte[] buffer = null;

      int remainder = len;
      long start = pointer;

      while (remainder != 0) {
        long bufferNumber = start/SQL_BUFFER_SIZE;
        Long bufferNr = new Long(bufferNumber);
        int bufferOffset = (int)(start%SQL_BUFFER_SIZE);
        int bytesInBuffer = SQL_BUFFER_SIZE - bufferOffset;
        int bytesToCopy = bytesInBuffer >= remainder ? remainder : bytesInBuffer;
        if (!cache.containsKey(bufferNr)) {
          pstmtSelect.setLong(1, bufferNumber);
          rs = pstmtSelect.executeQuery();
          rs.next();
          buffer = rs.getBytes(1);
          rs.close();
          cache.put(bufferNr, buffer);
        } else {
          buffer = cache.get(bufferNr);
        }
        System.arraycopy(buffer, bufferOffset, dest, destOffset, bytesToCopy);
        destOffset += bytesToCopy;
        start += bytesToCopy;
        remainder -= bytesToCopy;
      }

      pointer += len;
      pstmtSelect.close();

    } catch (Exception e) {
      throw new IllegalStateException(e.getMessage());
    }
    //System.out.println(" done");
  }

  /**
   * close call after
   */
  public final void close() {
    //System.out.print("InputStream.close(" + filename + ") ...");
    updateReader('-');
    //System.out.println(" done");
  }

  /**
   * random access: repositioning of pointer
   * @param pos
   */
  public final void seekInternal(long position) {
    pointer = position;
  }

  /**
   * helper method to modify the number of reader of a file
   * @param sign: '+' == increment; '-' == decrement
   */
  private final void updateReader(char sign) {
    try {
      Statement stmt = con.createStatement();
      stmt.executeUpdate("update "
          + fileTable + " set reader = reader " + sign + " 1 where filename = '"
          + filename + "'");
      stmt.close();
      con.commit();
    } catch (SQLException e) {
      try {
        con.rollback();
      } catch (SQLException se) {
        throw new IllegalStateException("rollback failed: " + se.getMessage());
      }
      throw new IllegalStateException(e.getMessage());
    }
  }
}

/**
 * @todo the 0th buffer seems to be overwritten from time to time by higher
 *       level api (4 bytes) -> (should be cached there?)
 * @todo move file status writing from flushBuffer() to close(). goal: do it
 *       only once. (transaction safety?)
 * @todo check batch processing. seems not to work with standard JDBC-ODBC bridge
 *       driver
 */
final class SQLOutputStream extends OutputStream {

  private static final int SQL_BUFFER_SIZE = 1024;

  private final Connection con;
  private final String fileTable;
  private final String bufferTable;
  private final String filename;

  private long length;
  private long pointer;
  private int bufferSize;
  private byte[] firstBuffer;

  private final PreparedStatement pstmtCreate;
  private final PreparedStatement pstmtSelect;
  private final PreparedStatement pstmtUpdate;
  private final PreparedStatement pstmtInsert;

  public SQLOutputStream(Connection con, String fileTable, String bufferTable, String filename) {
    // assign parameter
    this.con = con;
    this.fileTable = fileTable;
    this.bufferTable = bufferTable;
    this.filename = filename;
    // initialization
    pointer = 0;
    bufferSize = 0;
    length = 0;
    try {
      pstmtCreate = con.prepareStatement("update " + fileTable
          + " set modified = ?, length = ? where filename = '"
          + filename + "'");
      pstmtSelect = con.prepareStatement("select buffer from "
            + bufferTable + " where filename = '"
            + filename + "' and buffernumber = ?");
      pstmtUpdate = con.prepareStatement("update "
            + bufferTable + " set buffer = ? where filename = '"
            + filename + "' and buffernumber = ?");
      pstmtInsert = con.prepareStatement("insert into "
            + bufferTable + " (filename, buffernumber, buffer) values ("
            + "?, ?, ?)");
    } catch (SQLException e) {
      throw new IllegalStateException(e.getMessage());
    }
    //System.out.println("SqlOutputStream(" + filename + ")");
  }

  /** output methods: */
  public final void flushBuffer(byte[] src, int len) {
    //System.out.print("SqlOutputStream.flushBuffer(" + filename + ") ...");
    try {

      long bufferNumber = pointer/SQL_BUFFER_SIZE;
      int bufferOffset = (int)pointer%SQL_BUFFER_SIZE;
      int bytesInBuffer = SQL_BUFFER_SIZE - bufferOffset;
      int bytesToCopy = bytesInBuffer >= len ? len : bytesInBuffer;

      byte[] buffer;

      // append new buffer if required
      if (bufferNumber == bufferSize) {
        // create new buffer
        buffer = new byte[SQL_BUFFER_SIZE];
        // modify buffer
        System.arraycopy(src, 0, buffer, bufferOffset, bytesToCopy);
        if (bufferNumber == 0) {
          firstBuffer = buffer;
        }
        // create modified buffer
        pstmtInsert.setString(1, filename);
        pstmtInsert.setLong(2, bufferNumber);
        pstmtInsert.setBytes(3, buffer);
        pstmtInsert.executeUpdate();
        bufferSize++;
      } else {
        // read buffer
        if (bufferNumber != 0) {
          pstmtSelect.setLong(1, bufferNumber);
          ResultSet rs = pstmtSelect.executeQuery();
          rs.next();
          buffer = rs.getBytes(1);
          rs.close();
        } else {
          buffer = firstBuffer;
        }
        // modify buffer
        System.arraycopy(src, 0, buffer, bufferOffset, bytesToCopy);
        // write buffer back
        pstmtUpdate.setBytes(1, buffer);
        pstmtUpdate.setLong(2, bufferNumber);
        pstmtUpdate.executeUpdate();
      }
      // adapt file information
      pointer += len;
      length += len;
      // write file information
      pstmtCreate.setLong(1, System.currentTimeMillis());
      pstmtCreate.setLong(2, length);
      pstmtCreate.executeUpdate();
      con.commit();
    } catch (Exception e) {
      try {
        con.rollback();
      } catch (SQLException se) {
        throw new IllegalStateException("rollback failed: " + se.getMessage());
      }
      throw new IllegalStateException(e.getMessage());
    }
    //System.out.println(" done");
  }

  /**
   * be inconsistance? but performs much better!
   */
  public final void close() throws IOException {
    super.close();
    //System.out.println("SqlOutputStream.close(" + filename + ")");
   try {
      pstmtCreate.close();
      pstmtSelect.close();
      pstmtUpdate.close();
      pstmtInsert.close();
    } catch (SQLException se) {
      throw new IllegalStateException(se.getMessage());
    }
  }

  /** Random-access methods */
  public final void seek(long pos) throws IOException {
    super.seek(pos);
    pointer = pos;
  }
  public final long length() throws IOException {
    //System.out.println("SqlOutputStream.length(" + filename + ")");
    return length;
  }
}

/**
 * SQLCache
 * ringbuffer with constant access time (oldest element overwritten first)
 * maximal size (buffer)
 * 0th element never overwritten if set (mostly accessed)
 * minimum size is 2
 */
final class SQLCache extends Hashtable {

  private final Long[] ringbuffer;
  private final int size;

  private int position;

  public SQLCache(int size) {
    super(size);
    ringbuffer = new Long[size];
    this.size = size;
    position = 0;
  }

  public void put(Long bufferNumber, byte[] buffer) {
    // delete existing (must be garbage collected!)
    if (ringbuffer[position] != null) {
      remove(ringbuffer[position]);
    }
    // insert new
    ringbuffer[position] = bufferNumber;
    super.put(bufferNumber, buffer);
    // rotate position
    position++;
    if (position == size) {
      position=1;
    }
  }

  public byte[] get(Long bufferNumber) {
    return (byte[])super.get(bufferNumber);
  }
}