package net.test.cs;

import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Arrays;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.AffinityKey;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.jdbc2.JdbcConnection;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;

public class TestManager {

	public static void main(String[] args) throws Exception{
		
		// node 1
		IgniteConfiguration config = new IgniteConfiguration();
		config.setGridName("node-grid-1");
		
		TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
		TcpDiscoveryVmIpFinder ipfinder = new TcpDiscoveryVmIpFinder();
		ipfinder.setAddresses(Arrays.asList(new String[]{"127.0.0.1:47500..47509"}));
		
		discoSpi.setIpFinder(ipfinder);
		config.setDiscoverySpi(discoSpi);
		
		Ignite one = Ignition.start(config);
		
		
		// node 2
		IgniteConfiguration config1 = new IgniteConfiguration();
		
		TcpDiscoverySpi discoSpi1 = new TcpDiscoverySpi();
		TcpDiscoveryVmIpFinder ipfinder1 = new TcpDiscoveryVmIpFinder();
		ipfinder1.setAddresses(Arrays.asList(new String[]{"127.0.0.1:47500..47509"}));
		
		discoSpi1.setIpFinder(ipfinder1);
		config1.setDiscoverySpi(discoSpi1);
		config1.setGridName("node-grid-2");
		Ignite two = Ignition.start(config1);
		
		
		// node 3
		IgniteConfiguration config2 = new IgniteConfiguration();

		TcpDiscoverySpi discoSpi2 = new TcpDiscoverySpi();
		TcpDiscoveryVmIpFinder ipfinder2 = new TcpDiscoveryVmIpFinder();
		ipfinder2.setAddresses(Arrays.asList(new String[]{"127.0.0.1:47500..47509"}));
		
		discoSpi2.setIpFinder(ipfinder2);
		config2.setDiscoverySpi(discoSpi2);
		config2.setGridName("node-grid-3");
		Ignite three = Ignition.start(config2);
		
		
		IgniteCache<AffinityKey<String>, Person> pCache = two.getOrCreateCache(getPersonConfig());
		IgniteCache<AffinityKey<String>, PersonDetail> dCache = two.getOrCreateCache(getDetailsConfig());
		
		for (int i = 0; i < 5000; i++){
			Person p = new Person(String.valueOf(i), String.valueOf(i%10), "Name"+i);
			pCache.put(new AffinityKey<>(p.getPersonId(), p.getEquivalentId()), p);
		}
		
		for (int i = 0; i < 5000; i++){
			PersonDetail p = new PersonDetail(String.valueOf(i), String.valueOf(i%10), System.currentTimeMillis(), System.currentTimeMillis());
			dCache.put(new AffinityKey<>(p.getDetailId(), p.getEquivalentId()), p);
		}
		
		
		Class.forName("org.apache.ignite.IgniteJdbcDriver");
		JdbcConnection conn = (JdbcConnection)DriverManager.getConnection("jdbc:ignite:cfg://cache=PERSON_CACHE:collocated=false@file:C:/Anil/ignite-client.xml");
		
		System.out.println("***********************Conected to ignite cluster *************************************");	
		
//		String sql = "SELECT p.name as name, dupPerson.dupCount as count, pd.startDate as sdt, pd.endDate as edt  from PERSON_CACHE.PERSON p join (select equivalentId, count(*) dupCount from PERSON_CACHE.PERSON group by equivalentId ) dupPerson on p.equivalentId = dupPerson.equivalentId  join DETAILS_CACHE.PersonDetail pd on p.equivalentId = pd.equivalentId where p.personId = '100'";
		long start = System.currentTimeMillis();
		
		// fetch latest person details along for a person 
		String sql = "SELECT p.name as name, dupPerson.dupCount as count, pd.startDate as sdt, pd.endDate as edt  from PERSON_CACHE.PERSON p "
				+ "  join (select equivalentId, count(*) dupCount from PERSON_CACHE.PERSON group by equivalentId ) dupPerson on p.equivalentId = dupPerson.equivalentId" // to get the number of persons with same equivalent Id
				+ "  join DETAILS_CACHE.PersonDetail pd on p.equivalentId = pd.equivalentId" 
				+ "  join (select equivalentId, max(enddate) as enddate from DETAILS_CACHE.PersonDetail  group by equivalentId) maxPd on p.equivalentId = maxPd.equivalentId and maxPd.endDate = pd.endDate"
				+ "  where p.personId = '100'";
		PreparedStatement statement = conn.prepareStatement(sql);
		statement.setFetchSize(100);
		ResultSet rs = statement.executeQuery();

		long end = (System.currentTimeMillis() - start);
		int i = 0;
		while(rs.next()){
			System.out.println(rs.getString("name") + "-" +rs.getLong("COUNT")+ "-" +rs.getLong("sdt")+ "-" +rs.getLong("edt"));
			++i;
		}
		
		
		System.out.println("Count " + i);
		
		System.out.println("Time taken - "+ end);
	}
	
	
	/**
	 * Person object holds the person information. equivalent id is the affinity key
	 * 
	 * @return
	 */
	private static CacheConfiguration<AffinityKey<String>, Person> getPersonConfig(){
		CacheConfiguration<AffinityKey<String>, Person> pConfig = new CacheConfiguration<AffinityKey<String>, Person>();
		pConfig.setName("PERSON_CACHE");		
		pConfig.setIndexedTypes(AffinityKey.class, Person.class);
		pConfig.setBackups(0);
		pConfig.setCacheMode(CacheMode.PARTITIONED);
		pConfig.setCopyOnRead(false);
		pConfig.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
		pConfig.setOffHeapMaxMemory(1024 * 1024 * 1024 * 2);
		pConfig.setStatisticsEnabled(true);
		
		return pConfig;
	}
	
/*	
	private static CacheConfiguration<String, Person> getPersonConfig(){
		CacheConfiguration<String, Person> pConfig = new CacheConfiguration<String, Person>();
		pConfig.setName("PERSON_CACHE");		
		pConfig.setIndexedTypes(String.class, Person.class);
		pConfig.setBackups(0);
		pConfig.setCacheMode(CacheMode.PARTITIONED);
		pConfig.setCopyOnRead(false);
		pConfig.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
		pConfig.setOffHeapMaxMemory(1024 * 1024 * 1024 * 2);
		pConfig.setStatisticsEnabled(true);
		
		return pConfig;
	}

	*/
	/**
	 * Person detail object holds details of address like start date and end date.
	 * Person to person detail is one to many. a person will have many address with different start and end dates.
	 * 
	 * @return
	 */
	private static CacheConfiguration<AffinityKey<String>, PersonDetail> getDetailsConfig(){
		CacheConfiguration<AffinityKey<String>, PersonDetail> pConfig = new CacheConfiguration<AffinityKey<String>, PersonDetail>();
		pConfig.setName("DETAILS_CACHE");		
		pConfig.setIndexedTypes(AffinityKey.class, PersonDetail.class);
		pConfig.setBackups(0);
		pConfig.setCacheMode(CacheMode.PARTITIONED);
		pConfig.setCopyOnRead(false);
		pConfig.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
		pConfig.setOffHeapMaxMemory(1024 * 1024 * 1024 * 2);
		pConfig.setStatisticsEnabled(true);
		
		return pConfig;
	}

}
