import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static org.apache.ignite.events.EventType.EVTS_DISCOVERY;

/**
 * Created by alpert on 08/11/2016.
 */
public class Main {
    private static String cacheName = "EXAMPLE";
    private static String cacheName2 = "EXAMPLE2";

    public static void main(String[] args) throws InterruptedException, IOException {
        IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
        igniteConfiguration.setPeerClassLoadingEnabled(false)
                .setMetricsLogFrequency(0)
                .setIncludeEventTypes(EVTS_DISCOVERY);
        TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
        TcpDiscoveryMulticastIpFinder tcpDiscoveryMulticastIpFinder = new TcpDiscoveryMulticastIpFinder();
        tcpDiscoveryMulticastIpFinder.setMulticastGroup("228.10.10.142");
        tcpDiscoveryMulticastIpFinder.setMulticastPort(47400);
        tcpDiscoverySpi.setIpFinder(tcpDiscoveryMulticastIpFinder);
        igniteConfiguration.setDiscoverySpi(tcpDiscoverySpi);

        Ignite ignite = Ignition.start(igniteConfiguration);


        int nodeIdentifier = Integer.parseInt(args[0]);
        if (nodeIdentifier == 0) {
            System.out.println("Creating cache: " + cacheName);
            CacheConfiguration configuration = new CacheConfiguration();
            configuration.setAtomicityMode(CacheAtomicityMode.ATOMIC)
                    .setCacheMode(CacheMode.PARTITIONED)
                    .setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED)
                    .setBackups(1)
                    .setAffinity(new FairAffinityFunction(128))
                    .setOffHeapMaxMemory(0)
                    .setWriteBehindEnabled(false)
                    .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC)
                    .setEagerTtl(false)
                    .setWriteBehindFlushFrequency(30 * 1000)
                    .setWriteBehindBatchSize(1000)
                    .setWriteBehindFlushThreadCount(1)
                    .setStartSize(250000)
                    .setName(cacheName)
                    .setRebalanceThrottle(100)
                    .setRebalanceBatchSize(2 * 1024 * 1024)
                    .setRebalanceMode(CacheRebalanceMode.ASYNC)
                    .setCopyOnRead(false)
                    .setStatisticsEnabled(true)
                    .setWriteThrough(false);

            System.out.println("Creating cache conf: " + configuration);
            IgniteCache cache = ignite.getOrCreateCache(configuration);
        } else if (nodeIdentifier == 1) {
            System.out.println("Creating cache: " + cacheName2);
            CacheConfiguration configuration2 = new CacheConfiguration();
            configuration2.setAtomicityMode(CacheAtomicityMode.ATOMIC)
                    .setCacheMode(CacheMode.PARTITIONED)
                    .setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED)
                    .setRebalanceMode(CacheRebalanceMode.SYNC)
                    .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
                    .setRebalanceThrottle(100)
                    .setRebalanceBatchSize(2 * 1024 * 1024)
                    .setBackups(1)
                    .setAffinity(new FairAffinityFunction(128))
                    .setName(cacheName2)
                    .setEagerTtl(false);
            System.out.println("Creating cache conf: " + configuration2);
            IgniteCache router = ignite.getOrCreateCache(configuration2);
        }

        while (ignite.cluster().forServers().nodes().size() < 2) {
            System.out.println("Waiting other node to connect..");
            Thread.sleep(1000);
        }

        Thread.sleep(10000);

        for (int i = 0; i < Integer.parseInt(args[1]); i++) {
            UUID uuid = ignite.affinity(cacheName).mapKeyToNode(i).id();
            UUID uuid1 = ignite.affinity(cacheName2).mapKeyToNode(i).id();
            if (uuid != uuid1) {
                System.out.println("Different node for id: " + i);
                System.out.println(uuid);
                System.out.println(uuid1);
            }

        }

        System.out.println("Finished 1");

}
