package com.simplegeo.penelope.db;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
import org.apache.log4j.Logger;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;

public class PeriodicMemtableFlusher {
    private static final Logger logger = Logger.getLogger(PeriodicMemtableFlusher.class);

    private final long threshold;
    private final Timer timer;

    /**
     * Checks the total size of commit logs periodically and, if they're greater
     * than size, flushes memtables so that the commit logs can be removed.
     *
     * @param interval how often we should check commit log size
     * @param unit the unit of interval
     * @param threshold the size of commit logs, in bytes, before we should flush
     */
    public PeriodicMemtableFlusher(long interval, TimeUnit unit, long threshold) {
        // might want to add a safeguard that won't let you set threshold < commit log segment size
        this.threshold = threshold;
        this.timer = new Timer();
        this.timer.schedule(new TimerTask() {
            public void run() {
                PeriodicMemtableFlusher.this.maybeFlushSSTables();
            }
        }, TimeUnit.MILLISECONDS.convert(interval, unit), TimeUnit.MILLISECONDS.convert(interval, unit));
    }

    public boolean maybeFlushSSTables() {
        long size = commitLogSize();
        if (size >= threshold) {
            logger.info("Commit log size: " + size + " is above flush threshold: " + threshold + " flushing memtables.");

            for (String name : tables()) {
                try {
                    Table table = Table.open(name);
                    for (ColumnFamilyStore store : table.getColumnFamilyStores()) {
                        store.forceFlush();
                    }
                } catch (IOException e) {
                    logger.error("Error flushing SSTable: " + name, e);
                }
            }
            return true;
        } else {
            logger.info("Commit log size: " + size + " is below flush threshold: " + threshold + ", nothing to see here.");
        }
        return false;
    }

    public Set<String> tables() {
        return DatabaseDescriptor.getTables();
    }

    public long commitLogSize() {
        String directory = DatabaseDescriptor.getLogFileLocation();
        List<File> files = Arrays.asList(new File(directory).listFiles(new FilenameFilter() {
            public boolean accept(File dir, String name) {
                return name.matches("CommitLog-\\d+.log");
            }
        }));

        long size = 0;
        for (File file : files) {
            size += file.length();
        }
        return size;
    }
}
