package com.wangsan.study.ignite;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.logger.slf4j.Slf4jLogger;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
import org.junit.Test;

/**
 * @author wangsan
 * @date 2018/08/23
 */
public class IgniteMultiThreadTest {
    private static final String CACHE_DATA_REGION_PERSISTENCE = "PERSISTENCE_REGION";
    private static final String stringCacheOneName = "StringCacheOneName";

    @Test
    public void testStartServer() throws IOException {
        IgniteConfiguration cfg = new IgniteConfiguration();
        cfg.setClientMode(false);
        cfg.setDiscoverySpi(tcpDiscoverySpi());
        //        cfg.setDataStorageConfiguration(dataStorageConfiguration());
        cfg.setConsistentId("server_test_1");
        cfg.setIgniteInstanceName("server_test_1");
        cfg.setGridLogger(new Slf4jLogger());

        Ignite ignite = Ignition.start(cfg);

        CacheConfiguration<String, String> cacheOneConfiguration = new CacheConfiguration<>(stringCacheOneName);
        //        cacheOneConfiguration.setDataRegionName(CACHE_DATA_REGION_PERSISTENCE);
        cacheOneConfiguration.setCacheMode(CacheMode.REPLICATED);
        IgniteCache<String, String> cacheOne = ignite.getOrCreateCache(cacheOneConfiguration);

        ignite.events().localListen(event -> {
            System.err.println("get event " + event.name() + " " + event);
            if (event instanceof DiscoveryEvent) {
                ClusterNode clusterNode = ((DiscoveryEvent) event).eventNode();

                String item = "event_" + clusterNode.consistentId();
                System.err.println("------  oldest node process the message : " + item);

                switch (event.type()) {
                    case EventType.EVT_NODE_JOINED:
                        System.err.println("------- do add " + item);
                        cacheOne.put(item, item);
                        break;
                    case EventType.EVT_NODE_FAILED:
                    case EventType.EVT_NODE_LEFT:
                        System.err.println("------- do remove " + item);
                        cacheOne.remove(item);
                        break;
                    default:
                        System.err.println("ignore discovery event:" + event);
                        break;
                }

            } else {
                return false;
            }

            return true;
        }, EventType.EVTS_DISCOVERY);

        IntStream.range(0, 10000).forEach(i -> {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println();
            System.err.println("-------- " + cacheOne.size());
            System.err.println(ignite.cacheNames().stream().collect(Collectors.joining(",")));
        });

    }

    @Test
    public void testMultiClient() throws IOException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        IntStream.range(0, 2).forEach(i -> {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    testClient(i);
                }
            });
        });

        System.in.read();
    }

    @Test
    public void testClient() throws Exception {
        testClient(-100);
        System.in.read();
    }

    private void testClient(int index) {
        String name = "test_ignite_client_haha_" + index;
        System.out.println(name);

        IgniteConfiguration cfg = new IgniteConfiguration();
        cfg.setClientMode(false);
        cfg.setDiscoverySpi(tcpDiscoverySpi());
        //                    cfg.setDataStorageConfiguration(dataStorageConfiguration());
        cfg.setGridLogger(new Slf4jLogger());
        cfg.setIgniteInstanceName(name);
        cfg.setConsistentId(name);

        Ignite ignite = Ignition.start(cfg);
        System.out.println("------- started " + ignite.cluster().localNode().id());

        ignite.cluster().forServers().nodes()
                .forEach(n -> System.err.println(n.id() + "  " + n.consistentId()));
        ignite.cluster().active(true);

        IgniteCache<String, String> cacheOne = ignite.getOrCreateCache(stringCacheOneName);
        cacheOne.put(name, name);
    }

    public static class TestVO {
        private String id;
        private String name;
        private int age;

        public String getId() {
            return id;
        }

        public void setId(String id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }
    }

    private IgniteDiscoverySpi tcpDiscoverySpi() {
        TcpDiscoverySpi spi = new TcpDiscoverySpi();
        TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
        ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47509"));
        spi.setIpFinder(ipFinder);
        return spi;

        //        ZookeeperDiscoverySpi zkDiscoSpi = new ZookeeperDiscoverySpi();
        //        zkDiscoSpi.setZkConnectionString(
        //                "127.0.0.1:2181");
        //        zkDiscoSpi.setSessionTimeout(30_000);
        //        zkDiscoSpi.setZkRootPath("/apacheIgnite");
        //        zkDiscoSpi.setJoinTimeout(10_000);
        //
        //        return zkDiscoSpi;
    }

    private DataStorageConfiguration dataStorageConfiguration() {
        DataRegionConfiguration defaultRegion = new DataRegionConfiguration();
        defaultRegion.setName("DEFAULT_REGION");
        defaultRegion.setInitialSize(100 * 1024 * 1024);

        DataStorageConfiguration dataStorageConfiguration = new DataStorageConfiguration();
        dataStorageConfiguration.setDefaultDataRegionConfiguration(defaultRegion);

        // 持久化区域,每个模块必须有，否则node machine无法读取
        DataRegionConfiguration persistenceRegion = new DataRegionConfiguration();
        persistenceRegion.setName(CACHE_DATA_REGION_PERSISTENCE);
        persistenceRegion.setInitialSize(10 * 1024 * 1024);
        persistenceRegion.setPersistenceEnabled(false);
        dataStorageConfiguration.setDataRegionConfigurations(persistenceRegion);

        dataStorageConfiguration.setWalSegmentSize(10 * 1024 * 1024); // 没必要那么大

        return dataStorageConfiguration;
    }

}
