package net.test;

import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;

import javax.cache.Cache.Entry;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.AffinityKey;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;


public class TestComputeTask {
	/**
	 * Executes example.
	 *
	 * @param args
	 *            Command line arguments, none required.
	 * @throws IgniteException
	 *             If example execution failed.
	 */
	public static void main(String[] args) throws IgniteException {
		
		
		
		try  {
			/*URL resource = CacheManager.class.getResource("/ignite.xml");
			Ignite ignite = Ignition.start(resource);*/
			
			IgniteConfiguration config = new IgniteConfiguration();
			config.setGridName("node-grid-1");
			/*config.setLifecycleBeans(new LifecycleBean() {
				
				@Override
				public void onLifecycleEvent(LifecycleEventType evt) throws IgniteException {
					if (evt == LifecycleEventType.BEFORE_NODE_START) {
			            System.out.println("*********************8 before ignite started *************************");
			        }else if(evt == LifecycleEventType.AFTER_NODE_START){
			        	System.out.println("*********************************** After ignite started ***********************************");
			        }else if(evt == LifecycleEventType.BEFORE_NODE_STOP){
			        	System.out.println("*********************************** before node stop ***********************************");
			        }else if(evt == LifecycleEventType.AFTER_NODE_STOP){
			        	System.out.println("*********************************** After node stop ***********************************");
			        }
				}
			});*/
			TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
			TcpDiscoveryVmIpFinder ipfinder = new TcpDiscoveryVmIpFinder();
			ipfinder.setAddresses(Arrays.asList(new String[]{"127.0.0.1:47500..47509"}));
			
			discoSpi.setIpFinder(ipfinder);
			config.setDiscoverySpi(discoSpi);
			
			Ignite one = Ignition.start(config);
			System.out.println("Node started");
			
			// node 2
			IgniteConfiguration config1 = new IgniteConfiguration();
			
			TcpDiscoverySpi discoSpi1 = new TcpDiscoverySpi();
			TcpDiscoveryVmIpFinder ipfinder1 = new TcpDiscoveryVmIpFinder();
			ipfinder1.setAddresses(Arrays.asList(new String[]{"127.0.0.1:47500..47509"}));
			
			discoSpi1.setIpFinder(ipfinder1);
			config1.setDiscoverySpi(discoSpi1);
			config1.setGridName("node-grid-2");
			Ignite two = Ignition.start(config1);
			
			
			// node 3
			IgniteConfiguration config2 = new IgniteConfiguration();

			TcpDiscoverySpi discoSpi2 = new TcpDiscoverySpi();
			TcpDiscoveryVmIpFinder ipfinder2 = new TcpDiscoveryVmIpFinder();
			ipfinder2.setAddresses(Arrays.asList(new String[]{"127.0.0.1:47500..47509"}));
			
			discoSpi2.setIpFinder(ipfinder2);
			config2.setDiscoverySpi(discoSpi2);
			config2.setGridName("node-grid-3");
			Ignite three = Ignition.start(config2);
			
			
			IgniteCache<AffinityKey<String>, Person> pCache = one.getOrCreateCache(getPersonConfig());
			IgniteCache<AffinityKey<String>, PersonDetail> dCache = one.getOrCreateCache(getDetailsConfig());
			
			for (int i = 0; i < 5000; i++){
				Person p = new Person(String.valueOf(i), String.valueOf(i%10), "Name"+i);
				pCache.put(new AffinityKey<>(p.getPersonId(), p.getEquivalentId()), p);
			}
			
//			Person uP = new Person("10", String.valueOf(10%10), "Name-update");
//			pCache.put(new AffinityKey<>(uP.getPersonId(), uP.getEquivalentId()), uP);
			for (int i = 0; i < 5000; i++){
				PersonDetail p = new PersonDetail(String.valueOf(i), String.valueOf(i%10), System.currentTimeMillis(), System.currentTimeMillis());
				dCache.put(new AffinityKey<>(p.getDetailId(), p.getEquivalentId()), p);
			}
			
		
			System.out.println();
			System.out.println("Compute task split example started.");

			// Execute task on the cluster and wait for its completion.
			int cnt = one.compute().execute(DistributeComputeTask.class, "");

			System.out.println();
			System.out.println(">>> Total number of characters in the phrase is '" + cnt + "'.");
			System.out.println(">>> Check all nodes for output (this node is also part of the cluster).");
		}catch (Exception ex){
			
		}
	}

	private static class DistributeComputeTask extends ComputeTaskSplitAdapter<String, Integer> {

		@IgniteInstanceResource
		Ignite ignite;
		
		@Override
		public Integer reduce(List results) throws IgniteException {
			return null;
		}

		@Override
		protected Collection split(int gridSize, String arg) throws IgniteException {

			Collection<ComputeJob> jobs = new LinkedList<>();
			for (int i = 0; i < gridSize; i++) {
				jobs.add(new ComputeJobAdapter() {

					@Override
					public Object execute() throws IgniteException {
						IgniteCache<AffinityKey<String>, Person> cache = ignite.cache("PERSON_CACHE");
						IgniteCache<AffinityKey<String>, PersonDetail> detailsCache = ignite.cache("DETAILS_CACHE");
						QueryCursor<Entry<AffinityKey<String>, Person>> cursor = cache.query(new SqlQuery<AffinityKey<String>, Person>(Person.class, "from Person").setLocal(true));
						
						// for each person .. get all Person Details
						for (Entry<AffinityKey<String>, Person> e : cursor){
							QueryCursor<Entry<AffinityKey<String>, PersonDetail>> detailCursor = detailsCache.query(new SqlQuery<AffinityKey<String>, PersonDetail>(PersonDetail.class, "from PersonDetail where equivalentId").setLocal(true).setArgs(e.getValue().getEquivalentId()));
							for (Entry<AffinityKey<String>, PersonDetail> d : detailCursor){
								// for each person detail.. compare the content with person content and update the status to detail.
								
								// update each Person detail information.
							}
							
						}
						return true;
					}

				});
			}

			// TODO Auto-generated method stub
			return null;
		}
	}
	
	
	
	/**
	 * Person object holds the person information. equivalent id is the affinity key
	 * 
	 * @return
	 */
	private static CacheConfiguration<AffinityKey<String>, Person> getPersonConfig(){
		CacheConfiguration<AffinityKey<String>, Person> pConfig = new CacheConfiguration<AffinityKey<String>, Person>();
		pConfig.setName("PERSON_CACHE");		
		pConfig.setIndexedTypes(AffinityKey.class, Person.class);
		pConfig.setBackups(0);
		pConfig.setCacheMode(CacheMode.PARTITIONED);
		pConfig.setCopyOnRead(false);
		pConfig.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
		pConfig.setOffHeapMaxMemory(1024 * 1024 * 1024 * 2);
		pConfig.setStatisticsEnabled(true);
		
		return pConfig;
	}
	
	/**
	 * Person detail object holds details of address like start date and end date.
	 * Person to person detail is one to many. a person will have many address with different start and end dates.
	 * 
	 * @return
	 */
	private static CacheConfiguration<AffinityKey<String>, PersonDetail> getDetailsConfig(){
		CacheConfiguration<AffinityKey<String>, PersonDetail> pConfig = new CacheConfiguration<AffinityKey<String>, PersonDetail>();
		pConfig.setName("DETAILS_CACHE");		
		pConfig.setIndexedTypes(AffinityKey.class, PersonDetail.class);
		pConfig.setBackups(0);
		pConfig.setCacheMode(CacheMode.PARTITIONED);
		pConfig.setCopyOnRead(false);
		pConfig.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
		pConfig.setOffHeapMaxMemory(1024 * 1024 * 1024 * 2);
		pConfig.setStatisticsEnabled(true);
		
		return pConfig;
	}
}
