package cacheMultiLockTest;

import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Lock;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteLock;

/**
 * Test demonstrating a when a multiply held cache lock is unlocked once other
 * nodes can lock it.
 * 
 * The parent starts an ignite node and creates a cache lock. It then takes the
 * lock twice
 * 
 * It then creates a thread which starts a second node, gets a handle to the
 * same lock (proven by the fact that the set value can be read back) and tries
 * to take the lock.
 * 
 * As expected this blocks.
 * 
 * After waiting a while the parent unlocks just once. It still holds the lock.
 * 
 * Incorrectly the thread also unblocks. See unexpected messages starting with
 * 'ERROR'. Now both threads hold the same lock.
 */
public class CacheMultiLockTest {
	public final static String LOCKNAME = "LOCKNAME";
	public final static String CACHENAME = "CACHENAME";
	public final static int TESTVALUE = 123;

	public static void main(String[] args) {
		System.out.println("Starting test");

		// Make a uniquely named ignite node.
		IgniteConfiguration config1 = new DefaultIgniteConfig();
		config1.setIgniteInstanceName("INSTANCE1");

		// Start node.
		Ignite node1 = Ignition.start(config1);

		// Get a reference to the cache
		IgniteCache<String, Integer> cache = node1.getOrCreateCache(CACHENAME);

		// Write a pattern to the location so we can confirm we are connected to the
		// same cache.
		cache.put(LOCKNAME, TESTVALUE);

		// Make a lock
		Lock lock = cache.lock(LOCKNAME);

		// Check is initially unlocked
		if (cache.isLocalLocked(LOCKNAME, true)) {
			System.out.println("Is initially locked local");
		}
		if (cache.isLocalLocked(LOCKNAME, false)) {
			System.out.println("Is initially locked remote");
		}

		// Check is now locked
		if (!cache.isLocalLocked(LOCKNAME, true)) {
			System.out.println("Could not take local lock.");
		}

		// Create a remote thread.
		TestThread thread = new TestThread(LOCKNAME);

		try {
			// Give thread a while to initialize.
			Thread.sleep(5000);
		} catch (Exception e) {
			System.out.println("Could not sleep.");
		}

		// Take the lock twice
		lock.lock();
		lock.lock();

		// Run thread. Will block on the lock.
		thread.start();

		try {
			// Give thread a while to run.
			Thread.sleep(5000);
		} catch (Exception e) {
			System.out.println("Could not sleep.");
		}

		// Unlock once. Thread should not unblock
		System.out.println("About to unlock.");
		lock.unlock();

		// Confirm we also still hold the lock
		if (cache.isLocalLocked(LOCKNAME, true)) {
			System.out.println("We still hold the lock.");
		}

		System.out.println("About to join.");

		// Since the thread should still be blocked the following should wait forever,
		try {
			thread.join();
		} catch (Exception e) {
			System.out.println("Join threw");
		}

		System.out.println("ERROR : Thread has terminated.");

		// Close node
		node1.close();

		System.out.println("Test done");

		// Run of end and allow thread to self tidy.
		thread = null;
	}

	// Remote thread that takes a lock.
	private static class TestThread extends Thread {
		private Ignite node2;
		private String lockName;

		public TestThread(String lockName) {
			// Remember lock name.
			this.lockName = lockName;

			// Start a new, uniquely named, ignite node for the thread,
			IgniteConfiguration config2 = new DefaultIgniteConfig();
			config2.setIgniteInstanceName("INSTANCE2");

			// Start an ignite node. No caches required.
			node2 = Ignition.start(config2);
		}

		public void run() {
			// Get a handle to the lock and lock it.
			// Get a reference to the cache
			IgniteCache<String, Integer> cache = node2.getOrCreateCache(CACHENAME);

			// Make a lock
			Lock lock = cache.lock(lockName);

			// Read the value to check we are talking to the same cache.
			int value = cache.get(lockName);
			if (TESTVALUE != value) {
				System.out.println("Value is wrong = " + value);
			}

			// Take the lock. Should block forever.
			lock.lock();
			System.out.println("ERROR. Thread has returned from lock()");

			// Thread should still be alive and have taken the lock. Check it.
			if (cache.isLocalLocked(LOCKNAME, false)) {
				System.out.println("ERROR. Thread has lock");
			}

			System.out.println("Thread closing.");

			node2.close();
		}
	}

	// A simple, canned, ignite config.
	private static class DefaultIgniteConfig extends IgniteConfiguration {
		public DefaultIgniteConfig() {
			// Some basic config.
			setIgniteInstanceName("IGNITE_LOCK_TEST");
			// Need a server to connect to. So be a server.
			setClientMode(false);
			setPeerClassLoadingEnabled(true);

			// Set up node discovery
			TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
			TcpDiscoveryMulticastIpFinder finder = new TcpDiscoveryMulticastIpFinder();
			List<String> addresses = Arrays.asList("127.0.0.1:47500..47502");
			finder.setAddresses(addresses);
			discoSpi.setIpFinder(finder);
			setDiscoverySpi(discoSpi);

			// Set up data storage
			DataStorageConfiguration storeConfig = new DataStorageConfiguration();
			DataRegionConfiguration regionConfig = storeConfig.getDefaultDataRegionConfiguration();
			regionConfig.setName("IGNITE_LOCK_REGION");

			// For locking should not need persistence.
			regionConfig.setPersistenceEnabled(false);
			setDataStorageConfiguration(storeConfig);

			// Set up a cache. Same for both instances.
			CacheConfiguration<String, Integer> cacheConfig = new CacheConfiguration<String, Integer>();
			cacheConfig.setName(CACHENAME);

			cacheConfig.setCacheMode(CacheMode.REPLICATED);
			cacheConfig.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);

			// Not sure we need the following
			cacheConfig.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
			cacheConfig.setBackups(0);

			this.setCacheConfiguration(cacheConfig);
		}
	}
}
