import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import org.apache.ignite.DataRegionMetrics;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.binary.BinaryRawWriter;
import org.apache.ignite.binary.BinaryReader;
import org.apache.ignite.binary.BinaryWriter;
import org.apache.ignite.binary.Binarylizable;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.EventType;
import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;

public class Main {
	public static void main(String[] args) throws Exception {
		String[] cacheNames = { "TEST1", "TEST2", "TEST3" };
		
		IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
		igniteConfiguration.setGridName("grid")
			.setSegmentationPolicy(SegmentationPolicy.NOOP)
			.setMetricsLogFrequency(0)
			.setPeerClassLoadingEnabled(false)
			.setIncludeEventTypes(EventType.EVTS_DISCOVERY);
		
		DataStorageConfiguration dataStorageConfiguration = new DataStorageConfiguration();
		
		DataRegionConfiguration dataRegionConfiguration = new DataRegionConfiguration();
		dataRegionConfiguration.setName("data_region")
			.setMaxSize(100 * 1024 * 1024)
			.setPersistenceEnabled(true)
			.setMetricsEnabled(true);
		
		dataStorageConfiguration.setDataRegionConfigurations(dataRegionConfiguration)
			.setWalHistorySize(10);
		
		igniteConfiguration.setDataStorageConfiguration(dataStorageConfiguration);
		
		BinaryConfiguration binaryConfiguration = new BinaryConfiguration();
		binaryConfiguration.setCompactFooter(false);
		
		igniteConfiguration.setBinaryConfiguration(binaryConfiguration);
		
		TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
		tcpDiscoverySpi.setReconnectCount(10)
			.setAckTimeout(5000)
			.setSocketTimeout(5000);
		
		TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
		ipFinder.setMulticastGroup("228.10.10.158")
			.setMulticastPort(47400);
		
		tcpDiscoverySpi.setIpFinder(ipFinder);
		
		igniteConfiguration.setDiscoverySpi(tcpDiscoverySpi);
		
		TcpCommunicationSpi tcpCommunicationSpi = new TcpCommunicationSpi();
		tcpCommunicationSpi.setMessageQueueLimit(1024)
			.setSlowClientQueueLimit(1024)
			.setSocketWriteTimeout(10000)
			.setConnectTimeout(10000);
		
		igniteConfiguration.setCommunicationSpi(tcpCommunicationSpi);
		
		List<IgniteCache> caches = new ArrayList<>();
		Ignite ignite = Ignition.start(igniteConfiguration);
		ignite.cluster().active(true);
		
		CacheConfiguration cacheConfiguration = new CacheConfiguration<>();
		cacheConfiguration.setRebalanceThrottle(100)
			.setRebalanceBatchSize(2 * 1024 * 1024)
			.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC)
			.setRebalanceMode(CacheRebalanceMode.ASYNC)
			.setAtomicityMode(CacheAtomicityMode.ATOMIC)
			.setCacheMode(CacheMode.PARTITIONED)
			.setBackups(1)
			.setWriteThrough(false)
			.setWriteBehindEnabled(false)
			.setEagerTtl(false)
			.setCopyOnRead(false)
			.setDataRegionName("data_region")
			.setWriteBehindFlushFrequency(10000)
			.setWriteBehindBatchSize(10000)
			.setWriteBehindFlushThreadCount(1)
			.setWriteBehindFlushSize(1500000);
		
		RendezvousAffinityFunction affinityFunction = new RendezvousAffinityFunction(false, 1024);
		cacheConfiguration.setAffinity(affinityFunction);		
		
		for (String cacheName : cacheNames) {
			cacheConfiguration.setName(cacheName);
			caches.add(ignite.getOrCreateCache(cacheConfiguration));
		}

		System.out.println(caches.size() + " number of caches created.");

		while (true) {
			System.out.println("Adding elements beetween 0  and 500000 elements.");
			for (IgniteCache cache : caches) {
				for (int i = 0; i < 500000; i++) {
					Data data = new Data(i, "str1", "str2", "str3", 1L);
					cache.put(i, data);
				}
			}

			printCacheMetrics(ignite);

			System.out.println("Removing all");
			for (int j = 0; j < 10; j++) {
				for (IgniteCache cache : caches) {
					cache.removeAll();
				}
			}
			
			printCacheMetrics(ignite);
		}
	}

	public static void printCacheMetrics(Ignite ignite) {
		Collection<DataRegionMetrics> regionsMetrics = ignite.dataRegionMetrics();

		long pagesOnMemorySizes = 0;
		long pagesTotalSizes = 0;

		for (DataRegionMetrics rm : regionsMetrics) {
			if (rm.getName().equals("data_region")) {
				pagesOnMemorySizes += rm.getPhysicalMemorySize();
				pagesTotalSizes += rm.getTotalAllocatedSize();
			}
		}

		System.out.println("Cache size on memory: " + pagesOnMemorySizes
				+ " Bytes, total size: " + pagesTotalSizes + " Bytes");
	}
	
	static class Data implements Serializable, Binarylizable {
		/**
		 * 
		 */
		private static final long serialVersionUID = 1L;
		
		private String str1;
		private String str2;
		private String str3;
		private Long l1;
		private Integer i1;
		
		Data(int id, String s1, String s2, String s3, long l1) {
			this.str1 = s1;
			this.str2 = s2;
			this.str3 = s3;
			this.i1 = id;
			this.l1 = l1;
		}
		
		@Override
		public void readBinary(BinaryReader binaryReader) throws BinaryObjectException {
			 BinaryRawReader in = binaryReader.rawReader();
			this.str1 = in.readString();
			this.str2 = in.readString();
			this.str3 = in.readString();
			this.l1 = in.readLong();
			this.i1 = in.readInt();
		}
		@Override
		public void writeBinary(BinaryWriter binaryWriter) throws BinaryObjectException {
			BinaryRawWriter out = binaryWriter.rawWriter();
			out.writeString(str1);
			out.writeString(str2);
			out.writeString(str3);
			out.writeLong(l1);
			out.writeInt(i1);
			
		}
		
		
	}
	
	
	
	
}