package com..hrl.zk;

import java.io.Closeable;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.test.TestingServer;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com..hrl.crawler.CrawlUrl;
import com..hrl.crawler.Utils;
import com..hrl.main.CrawlerPropertyFile;

public class QueueProducer implements Closeable {
	
	final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);
	
	final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
	
	DistributedQueue<CrawlUrl> queue;
	
	private static final int QUEUE_SIZE = 2000;
	
	int items;
	
	int cnt;


	
	public QueueProducer(final CuratorFramework framework) {
		try {
			LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a QueueProducer");
			System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is a QueueProducer");
		} catch (Exception e) {
			System.err.println(e);
		}
		try { 
			this.queue = Utils.newDistributedQueue(framework,
					Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
			this.queue.start();
		} catch (Exception e) { 
			LOG.error("Error when starting the queue - " + e);
		}
		addQueueContent(QUEUE_SIZE);
		
		System.out.println("Done with the initial init");

		while (true) {
			try {
				Thread.sleep(1000);
				Stat stat = framework.checkExists().watched().forPath(Utils.CRAWL_QUEUE_PATH);
				int numChildren = stat.getNumChildren();
				if  (numChildren == 0) {
					LOG.info(dateFormat.format(new Date()) + " ERROR!!! The queue was empty!!");
				}
				LOG.info(dateFormat.format(new Date()) + " numChildren = "+ numChildren);
				addQueueContent((QUEUE_SIZE - numChildren));

			} catch (Exception e) {
				System.err.println();
			}
			
		}

	}
	
	

	void addQueueContent(int numberOfItems) {
		long time = System.currentTimeMillis();
		for (int i = 0; i < numberOfItems; i++) {
			try {
				CrawlUrl url = new CrawlUrl(""+this.items++);
				this.queue.put(url);
			} catch (Exception e) {
				LOG.error ("Caught an error when adding the item " + i + " in the initQueueContent()");
			}
		}
		LOG.info(dateFormat.format(new Date()) + " - addQueueContent " + numberOfItems + " - it took " + (System.currentTimeMillis() -time) + " msecs");
	}
	
	public static void main(String[] args) {
		CrawlerPropertyFile props;
		try {
			props = new CrawlerPropertyFile(args[0]);

			final String connectString;
			System.out.println("DEBUG = " + Utils.DEBUG);
			if (props.useZkTestServer()) {
				System.out.println("Will launch from zkTestServer");
				TestingServer server = new TestingServer();
				connectString = server.getConnectString();
			} else {
				connectString = props.getZkServer();
			}

			final CuratorFramework framework = Utils.newFramework(connectString);
			framework.start();

			@SuppressWarnings("unused")
			QueueProducer producer = new QueueProducer(framework);
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	@Override
	public void close() throws IOException {
		this.queue.close();
	}
	
	

}
