package simplequeue;

import java.io.IOException;
import java.util.Random;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.zookeeper.data.Stat;



public class QueueConsumerProducer {
	public static final String QUEUE_PATH = "/queue";
	public static final String LOCK_PATH = "/lock";
	
	public static final int QUEUE_SIZE = 100;
	
	public static Random rand = new Random();
	
	
	
	public static final QueueSerializer<SimpleQueueItem> SERIALIZER = new QueueSerializer<SimpleQueueItem>() {

		@Override
		public byte[] serialize(SimpleQueueItem item) {
			return item.serialize();
		}

		@Override
		public SimpleQueueItem deserialize(byte[] bytes) {
			return new SimpleQueueItem(bytes);
		}
	};
	
	
	private class Producer {
		
		@SuppressWarnings("null")
		public Producer(CuratorFramework framework, boolean lockPath) {
			QueueBuilder<SimpleQueueItem> builder = QueueBuilder.builder(framework , null, SERIALIZER, QUEUE_PATH);
			
			if (lockPath) {
				builder.lockPath(LOCK_PATH);
			}
			
			DistributedQueue<SimpleQueueItem> queue = null;
			// We start the queue
			try {
				queue = builder.buildQueue();

				queue.start();
			} catch (Exception e) {
				e.printStackTrace();
			}
			
			// We put QUEUE_SIZE items in the queue
			for (int i = 0; i < QUEUE_SIZE; i++) {
				try {
					queue.put (new SimpleQueueItem(i+""));
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			
			// now we monitoring the number of the items in the queue 
			long time = System.currentTimeMillis();
			Stat stat = null;
			try {
				stat = framework.checkExists().forPath(QUEUE_PATH);
			} catch (Exception e1) {
				e1.printStackTrace();
			}
			while (stat.getNumChildren() > 0) {
				// We sleep 1/2 sec
				try {
					Thread.sleep(500);
					try {
						stat = framework.checkExists().forPath(QUEUE_PATH);
					} catch (Exception e1) {
						e1.printStackTrace();
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			
			System.out.println("It took " + (System.currentTimeMillis() - time) + " msecs for consuming " + QUEUE_SIZE + " items");
		}
	}

	private class Consumer {
		
		public Consumer(CuratorFramework framework, boolean lockPath, final String id ) {
			
				QueueConsumer<SimpleQueueItem> consumer = new QueueConsumer<SimpleQueueItem>() {
					@Override
					public void stateChanged(CuratorFramework client, ConnectionState newState) {
						System.out.println(String.format("[%s] connection state changed to %s", ""+id, newState));
					}

					@Override
					public void consumeMessage(SimpleQueueItem item) throws Exception {
						try {
							// We do nothing, we just sleep between 100 and 500 msecs
							Thread.sleep(random(100, 500));
						} catch (Exception e) {
							e.printStackTrace();
						} 
					}
				};
			
			QueueBuilder<SimpleQueueItem> builder = QueueBuilder.builder(framework , consumer, SERIALIZER, QUEUE_PATH);
			if (lockPath) {
				builder.lockPath(LOCK_PATH);
			}
			// We start the queue
			try {
				builder.buildQueue().start();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
	
	static long random(int min, int max) {
		return rand.nextInt((max - min) + 1) + min;
	}
	

	@SuppressWarnings("unused")
	public QueueConsumerProducer(CuratorFramework framework, int numberOfThread, boolean lockPath) {
		
		// We start the consumers
		for (int i = 0; i < numberOfThread; i++) {
			new Consumer(framework, lockPath,  "id_"+i);
		}
		
		// We start the producer
		new Producer(framework, lockPath);
	}
	
	@SuppressWarnings("unused")
	public static void main(String[] args) {
		String connectString = null;
		TestingServer server = null;
		if (args.length > 0) {
			connectString = args[0];
		} else {
				System.out.println("Will launch from zkTestServer");
				try {
					server = new TestingServer();
					connectString = server.getConnectString();
				} catch (Exception e) {
					e.printStackTrace();
				}
		}
		

		
		boolean[] lockPathes = {false, true};
		for (boolean lockPath : lockPathes) {
			System.out.println("\n\n");
			if (lockPath) {
				System.out.println("With LOCK_PATH");
			} else {
				System.out.println("Without LOCK_PATH");
			}
			int numberofConsumers[] =  {1,32,16,8,4,2};
			
			for (int nbreOfConsumer : numberofConsumers) {
				RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 29);
				CuratorFramework framework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
				framework.start();
				System.out.println("-=-=-=-=-=-= " + nbreOfConsumer + " consumers =-=-=-=-=-=-");
				new QueueConsumerProducer(framework, nbreOfConsumer, lockPath);
				framework.close();
				if (server != null) {
					try {
						server.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
				}
			}
		}

	}
	


}
