import java.util.concurrent.locks.Lock;

import javax.annotation.PostConstruct;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.transactions.Transaction;
import org.springframework.stereotype.Component;

class SessObject {
	String sessObjName = "SessObj_Name_Default";

	SessObject() {

	}

	SessObject(String sessObjName) {
		this.sessObjName = sessObjName;
	}

	public String getSessObjName() {
		return sessObjName;
	}

	public void setSessObjName(String sessObjName) {
		this.sessObjName = sessObjName;
	}

	@Override
	public String toString() {
		return sessObjName;
	}

}

class WriteThread1 extends Thread {

	public void run() {
		Lock lock =null;
		try {
		IgniteCacheLockPOC poc =  IgniteCacheLockPOC.getInstance();
		String sessObjCacheKey = "james";
		lock = poc.getLock(sessObjCacheKey);
		lock.lock();
		System.out.println("WriteThread1 running...");
		SessObject sessObj = new SessObject("james_Val_thread1"); 
		
		poc.putInSoCache(sessObjCacheKey, sessObj);   		
		System.out.println(System.currentTimeMillis() + ": WriteThread1 - inserted into cache: " + sessObjCacheKey + ":" + sessObj);

		
			Thread.sleep(1000);
		
		
			SessObject sessObjCache =  poc.getFromSoCache(sessObjCacheKey);
		System.out.println("got from the cache in thread1"+sessObjCache.getSessObjName());
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}finally {
			if(lock != null) {
				lock.unlock();
			}
		}
	}
}

class WriteThread2 extends Thread {
	public void run() {
		Lock lock =null;
		try {
		IgniteCacheLockPOC poc =  IgniteCacheLockPOC.getInstance();
		String sessObjCacheKey = "james";
		lock = poc.getLock(sessObjCacheKey);
		lock.lock();
		System.out.println("WriteThread2 running...");
		SessObject sessObj = new SessObject("james_Val_thread2"); 
		poc.putInSoCache(sessObjCacheKey, sessObj);   		
		System.out.println(System.currentTimeMillis() + ": WriteThread2 - inserted into cache: " + sessObjCacheKey + ":" + sessObj);

		
			SessObject sessObjCache =  poc.getFromSoCache(sessObjCacheKey);
			System.out.println("got from the cache in thread2"+sessObjCache.getSessObjName());
		}finally {
			if(lock != null) {
				lock.unlock();
			}
		}
	}
	}


@Component
public class IgniteCacheLockPOC {
	static Ignite ignite;

	private static final String SESSION_OBJECT_CACHE = "SoCache";
	private static IgniteCache<String, SessObject> soCache;
	private static final IgniteCacheLockPOC single = new IgniteCacheLockPOC();

	private IgniteCacheLockPOC() {
	}

	public Lock getLock(String sessObjCacheKey) {
		return soCache.lock(sessObjCacheKey);
	}

	@PostConstruct
	private void initCache() {
		CacheConfiguration<String, SessObject> soCacheConfig = new CacheConfiguration<>(SESSION_OBJECT_CACHE);
		soCacheConfig.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
		soCacheConfig.setCacheMode(CacheMode.PARTITIONED);
		soCacheConfig.setBackups(1);
		ignite = Ignition.start();
		soCache = ignite.getOrCreateCache(soCacheConfig);
	}

	public static IgniteCacheLockPOC getInstance() {
		return (single);
	}

	

	protected boolean cleanAll() {
		return false;
	}


	public void putInSoCache(String tranId, SessObject so) {
		if (soCache != null) {
			// System.out.println("put the SO in SO Cache "+so);
			soCache.put(tranId, so);
		}
	}

	public SessObject getFromSoCache(String tranId) {
		if (soCache != null) {
			SessObject so = soCache.get(tranId);
			// System.out.println("get the SO of transactionId= "+tranId+" from Cache "+so);
			return so;
		} else {
			return null;
		}
	}

	public boolean replace(String tranId, SessObject so) {
		if (soCache != null) {
			// System.out.println("replace the SO in SO Cache "+so);
			return soCache.replace(tranId, so);
		}
		return false;
	}

	public void removeSoCache(String tranId) {
		if (soCache != null) {
			// System.out.println("get the SO of transactionId= "+tranId+" from Cache ");
			soCache.remove(tranId);
		}
	}

	public static void main(String[] args) {
		IgniteCacheLockPOC igniteCacheTransPOC = getInstance();
		igniteCacheTransPOC.initCache();


		WriteThread1 wThread1 = new WriteThread1();
		wThread1.start();

		WriteThread2 wThread2 = new WriteThread2();
		wThread2.start();

	}

}
