import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.DistributedPriorityQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryOneTime;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

/*
 *  Simple example that shows in code what I'd like to get wired in Spring:
 *  1. One CuratorFramework for whole app
 *  2. One DistributedPriorityQueue for whole app
 *  3. Lots of threads dequeuing items and working them in parallel.
 *  
 *  Actual output: only one thread used, total runtime is about 3 * 6 = 18 seconds
 *      Thread SimpleAsyncTaskExecutor-1 dequeued one
 *      Thread SimpleAsyncTaskExecutor-1 dequeued two
 *      Thread SimpleAsyncTaskExecutor-1 dequeued three
 *      Thread SimpleAsyncTaskExecutor-1 dequeued four
 *      Thread SimpleAsyncTaskExecutor-1 dequeued five
 *      Thread SimpleAsyncTaskExecutor-1 dequeued six
 *
 * Expected output: separate thread for each dequeue, total runtime around 3 seconds (all happen in parallel)
 *      Thread SimpleAsyncTaskExecutor-1 dequeued one
 *      Thread SimpleAsyncTaskExecutor-2 dequeued two
 *      Thread SimpleAsyncTaskExecutor-3 dequeued three
 *      Thread SimpleAsyncTaskExecutor-4 dequeued four
 *      Thread SimpleAsyncTaskExecutor-5 dequeued five
 *      Thread SimpleAsyncTaskExecutor-6 dequeued six
 */
public class AsyncAlwaysUsesOneThread implements QueueConsumer<String>,
												 QueueSerializer<String> {

	public static void main(String[] args) throws Exception {
		new AsyncAlwaysUsesOneThread().run();
	}
	
	public void run() throws Exception {
		// SINGLE CONNECTION TO CURATOR
		CuratorFramework client = CuratorFrameworkFactory.newClient("localhost", new RetryOneTime(1000));
		client.start();

		// CREATE QUEUE AND SET IT LISTENING FOR EVENTS
		DistributedPriorityQueue<String> queue;
		queue = QueueBuilder
			.builder(client, this, this, "/queue")
			.executor(new SimpleAsyncTaskExecutor()) // Every dequeue should get a different thread
			.buildPriorityQueue(1);
		queue.start();
		
		// ADD N MESSAGES IN ONE OPERATION (queue is already listening for messages to arrive)
		List<String> messages = new ArrayList<>(Arrays.asList("one", "two", "three", "four", "five", "six"));
		queue.putMulti(
				() -> { return messages.isEmpty() ? null : messages.remove(0); },
				1200);
		
		new BufferedReader(new InputStreamReader(System.in)).readLine();
	}

	@Override
	public void stateChanged(CuratorFramework client, ConnectionState newState) {
		System.out.println("Connection state changed: " + newState);
	}

	@Override
	public byte[] serialize(String item) {
		return item.getBytes();
	}

	@Override
	public String deserialize(byte[] bytes) {
		return new String(bytes);
	}

	@Override
	public void consumeMessage(String message) throws Exception {
		System.out.println("Thread " + Thread.currentThread().getName() + " dequeued " + message);
		Thread.sleep(3000); // 3 seconds
	}
}
