package userlist.repro;

import java.util.Arrays;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;

/**
 */
public class PartitionDsitributionTest extends GridCommonAbstractTest {

    /** */
    private static final int PARTS = 31;

    /** */
    private static boolean VERBOSE = false;

    /** */
    protected CacheConfiguration cacheConfiguration(String cacheName) {
        return new CacheConfiguration(cacheName)
            .setCacheMode(CacheMode.PARTITIONED)
            .setBackups(1);
    }

    /** */
    public void testRendevouxAF() throws Exception {
        RendezvousAffinityFunction function = new RendezvousAffinityFunction();

        function.setPartitions(PARTS);

        checkAF(function);
    }

    /** */
    public void testFairAF() throws Exception {
        FairAffinityFunction function = new FairAffinityFunction();

        function.setPartitions(PARTS);

        checkAF(function);
    }

    /** */
    public void checkAF(AffinityFunction af) throws Exception {
        try {
            Ignite ignite = startGridsMultiThreaded(4, true);

            String firstCacheName = "firstCache";
            String secondCacheName = "secondCache";

            //create cache
            ignite.createCache(cacheConfiguration(firstCacheName).setAffinity(af));

            if (VERBOSE) {
                System.out.println("Before rebalancing.");
                printPartitionInfo(ignite, firstCacheName);
            }

            // start new node and create second cache
            startGrid("newNode");
            ignite.createCache(cacheConfiguration(secondCacheName).setAffinity(af));

            awaitPartitionMapExchange();

            if (VERBOSE) {
                System.out.println("Rebalanced cache.");
                printPartitionInfo(ignite, firstCacheName);

                System.out.println("New cache.");
                printPartitionInfo(ignite, secondCacheName);
            }

            checkPartitionDistribution(ignite, firstCacheName, secondCacheName);
        }
        finally {
            Ignition.stopAll(true);
        }
    }

    /** */
    private void checkPartitionDistribution(Ignite ignite, String fstName, String sndName) {
        Affinity<Object> fstAffinity = ignite.affinity(fstName);
        Affinity<Object> sndAffinity = ignite.affinity(sndName);

        for (ClusterNode node : ignite.cluster().nodes()) {
            int[] fstParts = fstAffinity.allPartitions(node);
            int[] sndParts = sndAffinity.allPartitions(node);

            Arrays.sort(fstParts);
            Arrays.sort(sndParts);

            assertEquals("Collocated partitions differs.", fstParts, sndParts);
        }

        for (ClusterNode node : ignite.cluster().nodes()) {
            int[] fstParts = fstAffinity.primaryPartitions(node);
            int[] sndParts = sndAffinity.primaryPartitions(node);

            Arrays.sort(fstParts);
            Arrays.sort(sndParts);

            assertEquals("Primary nodes differs.", fstParts, sndParts);
        }
    }

    /** */
    private void printPartitionInfo(Ignite ignite, String cacheName) {
        System.out.println("Partition mapping for cache: " + cacheName);

        Affinity<Object> affinity = ignite.affinity(cacheName);

        for (ClusterNode node : ignite.cluster().nodes()) {
            int[] primaryPartitions = affinity.primaryPartitions(node);
            int[] backupPartitions = affinity.backupPartitions(node);

            Arrays.sort(primaryPartitions);
            Arrays.sort(backupPartitions);

            System.out.println(
                node.id() +
                    ":\n\tprimary:" +
                    Arrays.toString(primaryPartitions) +
                    "\n\tbackup:" +
                    Arrays.toString(backupPartitions)
            );
        }

        System.out.println();
    }

    /** */
    private void assertEquals(String msg, int[] exp, int[] act) {
        if (Arrays.equals(exp, act))
            return;

        StringBuilder sb = new StringBuilder(msg);

        sb.append("\n\tExpected: " + Arrays.toString(exp));
        sb.append("\n\tActual: " + Arrays.toString(act)).append("\n");

        throw new AssertionError(sb.toString());
    }
}
