package com..hrl.zk;

import java.io.Closeable;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.state.ConnectionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com..hrl.crawler.CrawlUrl;
import com..hrl.crawler.Utils;

public class MyQueueConsumer implements Closeable{
	
	private DistributedQueue<CrawlUrl> queue;

	String name;
	
	final static Logger LOG = LoggerFactory.getLogger(MyQueueConsumer.class);
	
	final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
	
	ArrayBlockingQueue<CrawlUrl> blockingQueue;
	
	Set<InternalConsumer> workers;
	
	
	private class InternalConsumer extends Thread {
	
		private long lastPrintedTime;
		
		private static final long TEN_SECONDS = 10000;
		
		private int id;
		
		private StringBuilder stringBuilder;
		
		private Random rand;
		
		public InternalConsumer(int id ) {
			this.stringBuilder = new StringBuilder();
			this.rand = new Random(System.currentTimeMillis()* (id+1));
			this.id =id;
		}
		
		@Override
		public void run() {
			// Do the job
			
			while (true) {
				CrawlUrl item = null;
				if ((item = MyQueueConsumer.this.blockingQueue.poll()) != null) {
					this.stringBuilder.append(dateFormat.format(new Date(System.currentTimeMillis())) + "[id_"+this.id+ "-" + MyQueueConsumer.this.name+ "] processed " + item.url + "\n");
					if (printsToLog() && LOG.isInfoEnabled()) {
						LOG.info(this.stringBuilder.toString());
						this.stringBuilder.setLength(0);
					}

					long sleepingTime = getSleepingTime();
					try {
						Thread.sleep(sleepingTime);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}

		private long getSleepingTime() {
			// We want to make a non uniform random sleep
			int key = this.rand.nextInt(10);
			switch (key){
				case 0:
				case 1:
				case 2:
				case 3:
				case 4:
					return random(10, 50); 
				case 5:
				case 6:
				case 7:
					return random(50, 500);
				default:
					return random(500, 3000);
			}
		}

		private long random(int min, int max) {
			return this.rand.nextInt((max - min) + 1) + min;
		}

		private boolean printsToLog() {
			if ((System.currentTimeMillis() - this.lastPrintedTime) > TEN_SECONDS) {
				this.lastPrintedTime = System.currentTimeMillis();
				return true;
			}
			return false;
		}
	}
	
	public MyQueueConsumer(CuratorFramework framework, int numWorkers) throws Exception {
		this.name = java.net.InetAddress.getLocalHost().getHostName();

		this.blockingQueue = new ArrayBlockingQueue<CrawlUrl>(numWorkers);
		
		this.workers = new HashSet<MyQueueConsumer.InternalConsumer>();
		for (int i = 0; i < numWorkers; i++) {
			InternalConsumer worker = new InternalConsumer(i);
			worker.start();
			this.workers.add(worker);
		}
		
		this.queue = Utils.newDistributedQueue(framework, Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new QueueConsumer<CrawlUrl>() {

			@Override
			public void stateChanged(CuratorFramework client, ConnectionState newState) {
				System.out.println(String.format("[%s] connection state changed to %s", MyQueueConsumer.this.name, newState));
			}

			@Override
			public void consumeMessage(CrawlUrl url) throws Exception {
				try {
					//LOG.info(dateFormat.format(new Date(System.currentTimeMillis())) + "["+MyQueueConsumer.this.id+ "-" + MyQueueConsumer.this.name+ "] consumed " + url.url);
					MyQueueConsumer.this.blockingQueue.put(url);
				} catch (Exception e) {
					LOG.error( "["+MyQueueConsumer.this.name+ "]" + e.getMessage() + " for url " + url.url );
				} 
			}

		});
		try {
			this.queue.start();
		} catch (Exception e) {
			e.printStackTrace();
		}

	}
	

	@Override
	public void close() throws IOException {
		this.queue.close();
	}
}
