package curator.question;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class PathChildrenCacheListenerTest {

	private final Logger log = LoggerFactory.getLogger(PathChildrenCacheListenerTest.class);
	private static final String curatorPath = "/cu/ra/tor"; 
	private static final String curatorChildPath = curatorPath + "/child"; 
	private ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(1);


	public static void main(String[] args) throws Exception {
		new PathChildrenCacheListenerTest().start();
	}

	private void start() throws Exception{
		createNodeIfNotExists();
		startEventSender();
		startEventReceiver();
	}

	private void startEventSender() {
		scheduledService.scheduleWithFixedDelay(new EventSender(), 1, 1, TimeUnit.SECONDS);
	}
	
	private void startEventReceiver() {
		new EventReceiver().start();
	}

	private class EventSender implements Runnable{
		public void run() {
			try {
				CuratorClient.Instance.getFramework().setData().forPath(curatorChildPath, "SampleData".getBytes());
			} catch (Exception exception) {
				log.error("Exception while setting data  "  , exception);
			}
		}
	}

	private class EventReceiver implements PathChildrenCacheListener{
		private final PathChildrenCache pathCache;
		private final Logger log = LoggerFactory.getLogger(EventReceiver.class);

		private EventReceiver() {
			pathCache = new PathChildrenCache(CuratorClient.Instance.getFramework(), curatorPath , false);
		}

		private void start(){
			try {
				pathCache.start(StartMode.BUILD_INITIAL_CACHE);
				pathCache.getListenable().addListener(this);
			} catch (final Exception exception) {
				log.error("Exception while starting path cache", exception);
			}
			log.info("Created node cache for {}", curatorPath);
		}
		public void childEvent(CuratorFramework arg0,
				PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
			final String path = pathChildrenCacheEvent.getData().getPath();
			log.debug("Event Received for path {} " , path);
		}
	}

	private enum CuratorClient{
		Instance;
		CuratorFramework curatorFramework;
		private CuratorClient(){
			this.curatorFramework = CuratorFrameworkFactory.newClient("localhost:2181", 500, 500, new ExponentialBackoffRetry(1000, 3));
			this.curatorFramework.start();
			waitAbit();
		}
		private CuratorFramework getFramework(){
			return curatorFramework;
		}
		private void waitAbit() {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
			}
		}
	}
	
	private void createNodeIfNotExists() throws Exception {
		final Stat stat = CuratorClient.Instance.getFramework().checkExists().forPath(curatorChildPath);
		if (stat == null) {
			CuratorClient.Instance.getFramework().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(curatorChildPath);
		}
	}

}
