/**
 * (c) DICOS GmbH, 2021
 *
 * $Id$
 */

package de.dicos.cpcfe.ignite;

import java.io.File;
import java.util.Arrays;
import java.util.List;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.ConnectorConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.logger.slf4j.Slf4jLogger;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import de.dicos.cpcfe.CommonTestBase;
import de.dicos.cpcfe.common.cluster.NodeController;

/**
 *
 * @author sth
 */
public class IgniteClusterTest
{
	// /////////////////////////////////////////////////////////
	// Class Members
	// /////////////////////////////////////////////////////////
	/** */
	private static Logger log = LoggerFactory.getLogger(IgniteClusterTest.class);
	
	/** */
	private IgniteEx igA;

	/** */
	private IgniteEx igB;

	/** */
	public static final String PERSISTENT_REGION = "persistent";

	/** */
	public static final String IN_MEMORY_REGION = "inmemory";

	// /////////////////////////////////////////////////////////
	// Constructors
	// /////////////////////////////////////////////////////////
	/**
	 */
	public IgniteClusterTest()
	{
	}

	// /////////////////////////////////////////////////////////
	// Methods
	// /////////////////////////////////////////////////////////
	/**
	 * @throws java.lang.Exception
	 */
	@Before
	public void setUp()
		throws Exception
	{
		try {
			log.info(">>>> starting node A");
			File da = new File("target/idd-a");
			CommonTestBase.rmrf(da);
			igA = startNode("a", da, 47500, 47100, 11211, 10800);
			log.info(">>>> node A is running");

			Thread.sleep(1000);

			log.info(">>>> starting node B");
			File db = new File("target/idd-b");
			CommonTestBase.rmrf(db);
			igB = startNode("b", db, 47501, 47101, 11212, 10801);
			log.info(">>>> node B is running");
			
		} catch (Throwable x) {
			log.error("unexpected exception", x);
			throw x;
		}
	}

	/**
	 * @throws java.lang.Exception
	 */
	@After
	public void tearDown()
		throws Exception
	{
		log.info(">>>> stopping all nodes");
		Ignition.stopAll(true);
	}


	@Test
	public void testPerm() throws InterruptedException
	{
		log.info("#### "+new Exception().getStackTrace()[0].getMethodName());
		
		IgniteCache<String, String> kva = getKeyValue(igA);
		IgniteCache<String, String> kvb = getKeyValue(igB);
		
		kva.put("a", "aval");
		Assert.assertEquals("aval", kvb.get("a"));
	}

	@Test
	public void testPerm2() throws InterruptedException
	{
		log.info("#### "+new Exception().getStackTrace()[0].getMethodName());
		
		IgniteCache<String, String> kva = getKeyValue(igA);
		IgniteCache<String, String> kvb = getKeyValue2(igB);
		
		kva.put("a", "aval");
		Assert.assertEquals("aval", kvb.get("a"));
	}

	@Test
	public void testMem() throws InterruptedException
	{
		log.info("#### "+new Exception().getStackTrace()[0].getMethodName());
		
		IgniteCache<String, String> kva = getInMemoryKeyValue(igA);
		@SuppressWarnings("unused")
		IgniteCache<String, String> kvb = getInMemoryKeyValue(igB);
		
		kva.put("a", "aval");
		
		Thread.sleep(1000);
		
		// In-Memory Regions scheinen nicht repliziert zu werden, wenn wir es meit einer
		// kombinierten Persistenz / in Memory Konfiguration zu tun haben.
		// Daherr wird dieser Test zunächst (16.2.2021) deaktiviert
		Assert.assertEquals("aval", kvb.get("a"));
	}

	@Test
	public void testMem2() throws InterruptedException
	{
		log.info("#### "+new Exception().getStackTrace()[0].getMethodName());
		
		IgniteCache<String, String> kva = getInMemoryKeyValue(igA);
		@SuppressWarnings("unused")
		IgniteCache<String, String> kvb = getInMemoryKeyValue2(igB);
		
		kva.put("a", "aval");
		
		Thread.sleep(1000);
		
		// In-Memory Regions scheinen nicht repliziert zu werden, wenn wir es meit einer
		// kombinierten Persistenz / in Memory Konfiguration zu tun haben.
		// Daherr wird dieser Test zunächst (16.2.2021) deaktiviert
		Assert.assertEquals("aval", kvb.get("a"));
	}

	/**
	 * 
	 */
	private IgniteEx startNode(String id, File datadir, int port, int commPort, int connectorPort, int clientPort)
	{
		datadir.mkdirs();
		String igniteDataDir = datadir.getAbsolutePath();
		
		DataRegionConfiguration persistentRegion = new DataRegionConfiguration()//
						.setName(PERSISTENT_REGION)//
						.setPersistenceEnabled(true);
		DataRegionConfiguration inMemoryRegion = new DataRegionConfiguration()//
						.setName(IN_MEMORY_REGION)//
						.setPersistenceEnabled(false)//
						;;
						//.setCheckpointPageBufferSize(5*1024*1024)
						//.setPageEvictionMode(DataPageEvictionMode.RANDOM_2_LRU)
						//.setInitialSize(15*1024*1024)
						//.setMaxSize(100*1024*1024);
		
		List<String> clusterAddresses = Arrays.asList("127.0.0.1:47500..47501", "127.0.0.1:47505..47506");
		
		IgniteConfiguration igniteCfg = new IgniteConfiguration()//
			.setIgniteInstanceName(id)//
			.setWorkDirectory(igniteDataDir)//
			.setConsistentId("testcluster-"+id)//
			.setGridLogger(new Slf4jLogger())//
			.setMetricsLogFrequency(0)//
			.setDataStorageConfiguration(new DataStorageConfiguration()//
				.setDefaultDataRegionConfiguration(persistentRegion)//
				.setDataRegionConfigurations(inMemoryRegion))//
				//.setDefaultDataRegionConfiguration(inMemoryRegion)//
				//.setDataRegionConfigurations(persistentRegion))//
			.setDiscoverySpi(new TcpDiscoverySpi()//
				.setIpFinder(new TcpDiscoveryVmIpFinder()//
					.setAddresses(clusterAddresses)
					.setShared(true))//
				.setLocalAddress("127.0.0.1")//
				.setLocalPort(port)//
				.setLocalPortRange(2))
			.setCommunicationSpi(new TcpCommunicationSpi()//
				.setMessageQueueLimit(1024)
				.setLocalAddress("127.0.0.1")
				.setLocalPort(commPort)
				.setLocalPortRange(2))//
			.setConnectorConfiguration(new ConnectorConfiguration()
				.setHost("127.0.0.1")
				.setPort(connectorPort)
				.setPortRange(2))
			.setClientConnectorConfiguration(new ClientConnectorConfiguration()
				.setPort(clientPort)
				.setPortRange(2));

		IgniteEx ignite = (IgniteEx)Ignition.start(igniteCfg);
		
		ignite.cluster().state(ClusterState.ACTIVE);

		return ignite;
	}
	
	/**
	 * 
	 */
	private IgniteCache<String, String> getKeyValue(Ignite ignite)
	{
		return ignite.getOrCreateCache(new CacheConfiguration<String, String>()
				.setName("permkv")
				.setDataRegionName(NodeController.PERSISTENT_REGION)
				.setCacheMode(CacheMode.REPLICATED)
				.setBackups(2));
	}

	/**
	 * 
	 */
	private IgniteCache<String, String> getKeyValue2(Ignite ignite)
	{
		return ignite.cache("permkv");
	}

	/**
	 * 
	 */
	private IgniteCache<String, String> getInMemoryKeyValue(Ignite ignite)
	{
		return ignite.getOrCreateCache(new CacheConfiguration<String, String>()
				.setName("memkv")
				.setCacheMode(CacheMode.REPLICATED)
				.setDataRegionName(NodeController.IN_MEMORY_REGION)
				.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
				.setBackups(2));
	}

	/**
	 * 
	 */
	private IgniteCache<String, String> getInMemoryKeyValue2(Ignite ignite)
	{
		return ignite.cache("memkv");
	}



	// /////////////////////////////////////////////////////////
	// Inner Classes
	// /////////////////////////////////////////////////////////


}
