package cl.altiuz.reports.zmq;

import java.util.logging.Logger;

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;

public class Server {

	private static final int DEFAULT_WORKERS = 10;
	private static final Logger LOG = Logger.getLogger(Server.class.getName());
	private final int workers;

	public Server(final int workers) {
		this.workers = workers;
		System.out.println("Workers: " + workers);

	}

	public static void main(final String[] args) {
		try {
			int workers = DEFAULT_WORKERS;
			if (args.length > 0) {
				workers = Integer.parseInt(args[0]);
			}
			final Server server = new Server(workers);
			server.run();
		} catch (NumberFormatException e) {
			System.err.println("Numero de workers erroneo: [" + args[0] + "]");
		}
	}

	public void run() {
		// Prepare our context and sockets
		final Context context = ZMQ.context(1);
		final Socket frontend = context.socket(ZMQ.ROUTER);
		final Socket backend = context.socket(ZMQ.DEALER);
		frontend.bind("tcp://*:5559");
		backend.bind("inproc://workers");

		Thread threads[] = new Thread[workers];
		for (int i = 0; i < threads.length; i++) {
			threads[i] = new WorkerThread(i, context);
			threads[i].start();
		}
		System.out.println("launch and connect broker.");

		// Initialize poll set
		final Poller items = context.poller(2);
		items.register(frontend, Poller.POLLIN);
		items.register(backend, Poller.POLLIN);

		boolean more = false;
		byte[] message;

		// Switch messages between sockets
		try {
			while (!Thread.currentThread().isInterrupted()) {
				// poll and memorize multipart detection
				items.poll();

				if (items.pollin(0)) {
					while (true) {
						// receive message
						message = frontend.recv(0);
						LOG.info("in-0: req rcvd");
						more = frontend.hasReceiveMore();

						// Broker it
						backend.send(message, more ? ZMQ.SNDMORE : 0);
						LOG.info("in-0: Rsp sent");
						if (!more) {
							break;
						}
					}
				}
				if (items.pollin(1)) {
					while (true) {
						// receive message
						message = backend.recv(0);
						LOG.info("in-1: req rcvd");
						more = backend.hasReceiveMore();
						// Broker it
						frontend.send(message, more ? ZMQ.SNDMORE : 0);
						LOG.info("in-1: Rsp sent");
						if (!more) {
							break;
						}
					}
				}
			}
		} finally {
			// We never get here but clean up anyhow
			frontend.close();
			backend.close();
			context.term();
		}
	}
}
