import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.configuration.DeploymentMode.SHARED;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import javax.cache.Cache.Entry;
import javax.cache.configuration.Factory;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;

/**
 * 
 * @author mchelyada
 * 
 */
public class CacheEntryProcessorTest {

    @SuppressWarnings({ "rawtypes", "unchecked" })
    private CacheConfiguration<String, List<Double>> createCacheConfiguration(String cacheName) {
        CacheConfiguration<String, List<Double>> cacheConfiguration = new CacheConfiguration<String, List<Double>>();

        cacheConfiguration.setName(cacheName);
        cacheConfiguration.setCacheMode(PARTITIONED);
        cacheConfiguration.setAtomicityMode(ATOMIC);
        cacheConfiguration.setAtomicWriteOrderMode(PRIMARY);
        cacheConfiguration.setWriteSynchronizationMode(PRIMARY_SYNC);

        cacheConfiguration.setReadThrough(true);
        cacheConfiguration.setWriteThrough(true);

        Factory cacheStoreFactory = new FactoryBuilder.SingletonFactory(new DummyCacheStore());
        cacheConfiguration.setCacheStoreFactory(cacheStoreFactory);

        return cacheConfiguration;
    }

    private IgniteConfiguration getGridConfiguration(String gridName) throws IgniteException {
        IgniteConfiguration gridConfig = new IgniteConfiguration();
        gridConfig.setGridName(gridName);

        CacheConfiguration<String, List<Double>> cacheConfig = createCacheConfiguration("testCache");
        gridConfig.setCacheConfiguration(cacheConfig);

        gridConfig.setLocalHost("127.0.0.1");
        gridConfig.setDeploymentMode(SHARED);

        TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
        ipFinder.setAddresses(Arrays.asList("127.0.0.1:47501", "127.0.0.1:47502"));

        TcpDiscoverySpi spi = new TcpDiscoverySpi();
        spi.setIpFinder(ipFinder);
        spi.setLocalAddress("127.0.0.1");
        spi.setLocalPort(47501);

        gridConfig.setDiscoverySpi(spi);

        return gridConfig;
    }

    public void doCacheEntryProcessor() throws Exception {
        IgniteConfiguration gridConfig = getGridConfiguration("testGrid");
        Ignite grid = Ignition.start(gridConfig);

        final Set<String> testKeys = new HashSet<String>();
        testKeys.add("testKey_1");
        testKeys.add("testKey_2");
        testKeys.add("testKey_3");

        final IgniteCache<String, List<Double>> testCache = grid.cache("testCache");
        EntryProcessor<String, List<Double>, List<Double>> entryProcessor = new DummyEntryProcessor();

        testCache.invokeAll(testKeys, entryProcessor);
    }

    @SuppressWarnings("serial")
    private static class DummyCacheStore extends CacheStoreAdapter<String, List<Double>> implements Serializable {

        @Override
        public List<Double> load(String key) throws CacheLoaderException {
            return new ArrayList<Double>(Arrays.asList(1.0, 1.0, 1.0, 1.0, 1.0));
        }

        @Override
        public void write(Entry<? extends String, ? extends List<Double>> entry) throws CacheWriterException {

        }

        @Override
        public void delete(Object key) throws CacheWriterException {
        }
    }

    private static class DummyEntryProcessor implements EntryProcessor<String, List<Double>, List<Double>> {

        @Override
        public List<Double> process(MutableEntry<String, List<Double>> entry, Object... arguments) throws EntryProcessorException {
            List<Double> currentValue = entry.getValue();

            if (currentValue == null) {
                entry.setValue(Arrays.asList(1.0, 1.0, 1.0, 1.0, 1.0));
            } else {

                List<Double> newValue = new ArrayList<Double>();
                for (Double item : currentValue) {
                    newValue.add(item + 1);
                }
                entry.setValue(newValue);
            }

            // returning null for test because we don't need result
            return null;
        }
    };

    public static void main(String[] args) throws Exception {
        System.out.println("Running cache entry test.");

        CacheEntryProcessorTest test = new CacheEntryProcessorTest();

        test.doCacheEntryProcessor();

        System.out.println("Test complete!");
    }
}
