import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import eu.rekawek.toxiproxy.model.Toxic;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import eu.rekawek.toxiproxy.model.toxic.Timeout;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.CacheException;
import org.apache.commons.lang3.StringUtils;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.jetbrains.annotations.NotNull;

public class ToxiproxyTest {

    private Proxy discoProxy;
    private Proxy commProxy;

    public static void main(String[] args) throws Exception {
        new ToxiproxyTest().run();
    }

    private void run() throws Exception {
        initToxiProxy();

        log("start server...");
        Ignite server = Ignition.start(cfg(false));
        log("server started.");

        Thread.sleep(2_000);

//        addLatency(5_000);

        log("start client...");
        Ignite client = Ignition.start(cfg(true));
        log("client started.");

//        addLatency(20_000);

        IgniteCache<Object, Object> cache = client.getOrCreateCache("MY_CACHE");

        new Thread(new Runnable() {
            @Override public void run() {
                while (true) {
                    try {
                        cache.put(1, 1);

                        log("put done");
                    }
                    catch (CacheException exception) {
                        if (exception.getCause() instanceof IgniteClientDisconnectedException) {
                            log("wait on future...");
                            ((IgniteClientDisconnectedException)exception.getCause()).reconnectFuture().get();
                            log("reconnected");
                        }
                    }

                    try {
                        Thread.sleep(1_000);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();


        while (true) {
            Thread.sleep(10_000);

            addBlockToxic(1_000);

            Thread.sleep(90_000);

            removeToxics();
        }

    }

    protected void log(String s) {
        int msgLength = s.length() + 2;

        int sharpBorderLengtn = Math.max(msgLength, 42);

        String now = LocalTime.now().toString();
        System.out.println(now + StringUtils.repeat("#", sharpBorderLengtn - now.length()));
        System.out.println("#" + StringUtils.center(s, sharpBorderLengtn - 2) + "#");
        System.out.println(StringUtils.repeat("#", sharpBorderLengtn));
    }

    private void removeToxics() throws IOException {
        log("remove all toxics");

        for (Toxic toxic : discoProxy.toxics().getAll()) {
            toxic.remove();
        }
        for (Toxic toxic : commProxy.toxics().getAll()) {
            toxic.remove();
        }
    }

    private void addLatency(int latency) throws IOException {
        log("add latency toxic");

        discoProxy.toxics().latency("latency_disco",
            ToxicDirection.UPSTREAM, latency);
        commProxy.toxics().latency("latency_comm",
            ToxicDirection.UPSTREAM, latency);
    }

    private void addBlockToxic(int timeout) throws IOException {
        log("add block toxic");

        discoProxy.toxics().timeout("blck_disco_up",
            ToxicDirection.UPSTREAM, timeout);
        discoProxy.toxics().timeout("blck_disco_down",
            ToxicDirection.DOWNSTREAM, timeout);
        commProxy.toxics().timeout("blck_comm_up",
            ToxicDirection.UPSTREAM, timeout);
        commProxy.toxics().timeout("blck_comm_down",
            ToxicDirection.DOWNSTREAM, timeout);
    }

    private void initToxiProxy() throws IOException {
        ToxiproxyClient toxiproxyClient = new ToxiproxyClient();

        discoProxy = toxiproxyClient.getProxyOrNull("gg-disco-proxy");

        if (discoProxy != null)
            discoProxy.delete();//remove old one

        discoProxy = toxiproxyClient.createProxy("gg-disco-proxy", "localhost:47500", "localhost:47600");

        commProxy = toxiproxyClient.getProxyOrNull("gg-comm-proxy");

        if (commProxy != null)
            commProxy.delete();//remove old one

        commProxy = toxiproxyClient.createProxy("gg-comm-proxy", "localhost:47100", "localhost:47200");
    }

    private static final AtomicLong id = new AtomicLong(0);

    private static IgniteConfiguration cfg(boolean client) {
        IgniteConfiguration igniteCfg = new IgniteConfiguration();

        igniteCfg.setNetworkTimeout(200);

        igniteCfg.setFailureDetectionTimeout(120_000);
        igniteCfg.setClientFailureDetectionTimeout(120_000);

        System.out.println("create cfg for client: " + client);
        igniteCfg.setClientMode(client);

        String name = (client ? "client_" : "server_") + id.getAndIncrement();
        igniteCfg.setIgniteInstanceName(name);

        igniteCfg.setLocalHost("localhost");

        String connectToAddresses = "localhost:47600";

        if (!client) {
            Map<String, List<String>> userAttr = Collections.singletonMap("TcpCommunicationSpi.comm.tcp.addrs",
                Collections.emptyList());

            igniteCfg.setUserAttributes(userAttr);
        }

        if (client)
            connectToAddresses = "localhost:47500";

        TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
        ipFinder.setAddresses(Arrays.asList(connectToAddresses));

        TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
        tcpDiscoverySpi.setIpFinder(ipFinder);
        tcpDiscoverySpi.setLocalPort(47600);

        igniteCfg.setDiscoverySpi(tcpDiscoverySpi);

        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
        commSpi.setLocalPort(47200);

        igniteCfg.setCommunicationSpi(commSpi);

        if (!client) {
            igniteCfg.setAddressResolver(getRslvr("[ignite]"));
            tcpDiscoverySpi.setAddressResolver(getRslvr("[disco]"));
            commSpi.setAddressResolver(getRslvr("[comm]"));
        }

        igniteCfg
            .setDataStorageConfiguration(
                new DataStorageConfiguration()
                    .setDefaultDataRegionConfiguration(
                        new DataRegionConfiguration().setMaxSize(1024L * 1024 * 1024)
                            .setPersistenceEnabled(false)
                    ));

        return igniteCfg;
    }

    @NotNull private static AddressResolver getRslvr(String s) {
        return new AddressResolver() {
            @Override public Collection<InetSocketAddress> getExternalAddresses(
                InetSocketAddress addr) throws IgniteCheckedException {
                List<InetSocketAddress> res = Collections.singletonList(
                    new InetSocketAddress(addr.getHostName(),
                        addr.getPort() == 0 ? 0 : addr.getPort() - 100)
                );

                System.out.println(Thread.currentThread().getName() + " " + s + "resolve: " + addr + " ->" + res);
                return res;
            }
        };
    }
}
