package userlist.repro;

import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.configuration.FactoryBuilder;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryBasicIdMapper;
import org.apache.ignite.binary.BinaryBasicNameMapper;
import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.processors.cache.GridCacheTestStore;
import org.apache.ignite.lang.IgnitePredicate;

/**
 */
public class PutEventsTest {

    public static final int NODES_CNT = 5;

    /** */
    static IgniteConfiguration getConfiguration(String gridName) {
        BinaryConfiguration binaryCfg = new BinaryConfiguration();

        binaryCfg.setCompactFooter(false);
        binaryCfg.setIdMapper(new BinaryBasicIdMapper(true));
        binaryCfg.setNameMapper(new BinaryBasicNameMapper(true));

        return new IgniteConfiguration()
            .setGridName(gridName)
            .setIncludeEventTypes(EventType.EVT_CACHE_OBJECT_PUT, EventType.EVT_CACHE_OBJECT_REMOVED)
            .setBinaryConfiguration(binaryCfg);
    }

    /** */
    static <K, V> CacheConfiguration<K, V> cacheConfiguration(String cacheName) {
        CacheConfiguration ccfg = new CacheConfiguration(cacheName)
            .setCacheMode(CacheMode.REPLICATED)
            .setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.PRIMARY)
            .setBackups(0)
            .setWriteBehindEnabled(false)
            .setCacheStoreFactory(FactoryBuilder.factoryOf(GridCacheTestStore.class));

        ccfg.setReadThrough(true)
            .setWriteThrough(true);

        return ccfg;

    }

    /** */
    public static void main(String[] args) throws Exception {
        final CountDownLatch latch = new CountDownLatch(3 * NODES_CNT);

        IgnitePredicate<CacheEvent> locLsnr = (CacheEvent evt) -> {
            if (!evt.hasOldValue()) {
                assert evt.oldValue() == null;
                assert 10 == ((Trade)evt.newValue()).getSize();
            }
            else
                assert ((Trade)evt.oldValue()).getSize() + 10 == ((Trade)evt.newValue()).getSize();

            latch.countDown();

            return true;
        };

        // start gird
        for (int i = 0; i < NODES_CNT; i++)
            Ignition.start(getConfiguration("node-" + i)).events().localListen(locLsnr, EventType.EVT_CACHE_OBJECT_PUT);

        try {
            IgniteCache<TradeKey, Trade> cache = Ignition.ignite("node-0").getOrCreateCache(cacheConfiguration("MyCache"));
            final TradeKey key = new TradeKey(1);

            // initial put
            Trade trade1 = new Trade();
            trade1.setId(1);
            trade1.setSize(10);

            cache.put(key, trade1);

            // re-put updated
            trade1.setSize(20);

            cache.put(key, trade1);

            // get-update-put
            trade1 = cache.get(key);
            trade1.setSize(30);
            cache.put(new TradeKey(trade1.getId()), trade1);

            latch.await(); // ensure all events was fired

            cache.destroy();
        }
        finally {
            Ignition.stopAll(true);
        }
    }

    /** */
    private static class Trade implements Serializable {
        /** */
        int id;

        /** */
        int size;

        /** */
        public int getId() {
            return id;
        }

        /** */
        public void setId(int id) {
            this.id = id;
        }

        /** */
        public int getSize() {
            return size;
        }

        /** */
        public void setSize(int size) {
            this.size = size;
        }
    }

    /** */
    private static class TradeKey implements Serializable {
        /** */
        int id;

        /** */
        public TradeKey(int id) {
            this.id = id;
        }

        /** */
        @Override public boolean equals(Object o) {
            if (this == o)
                return true;
            if (o == null || getClass() != o.getClass())
                return false;

            TradeKey key = (TradeKey)o;

            return id == key.id;

        }

        /** */
        @Override public int hashCode() {
            return id;
        }
    }
}
