package test;

import java.util.List;
import java.util.Random;
import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;

public class BroadcastSqlQuery {
    public static final int ENTRIES_COUNT = 100;

    public static final String CACHE_NAME = "person";

    public static final String[] PERSON_NAMES = new String[] {"\u006d\u0061\u006c\u0065", "\u0645\u0631\u062f"};

    protected static CacheConfiguration<String, Person> cacheConfig() {
        return new CacheConfiguration<String, Person>(CACHE_NAME)
            .setIndexedTypes(String.class, Person.class);
    }

    protected static IgniteConfiguration config(String name) {
        return new IgniteConfiguration()
            .setGridName(name);
    }

    public static void main(String[] args) {
        try {
            Ignite ignite = Ignition.start(config("node-0"));

            IgniteCache<String, Person> cache = ignite.getOrCreateCache(cacheConfig());

            fillCache(cache, ENTRIES_COUNT);

            assert ENTRIES_COUNT == cache.size();

            ignite.compute().broadcast(new MyTask());

            Ignition.start(config("node-1"));

            ignite.compute().broadcast(new MyTask());

            Ignition.start(config("node-2"));

            ignite.compute().broadcast(new MyTask());
        }
        finally {
            Ignition.stopAll(true);
        }
    }

    public static void fillCache(IgniteCache<String, Person> cache, int n) {
        Random randomGenerator = new Random();

        for (int i = 0; i < n; i++) {
            String key = UUID.randomUUID().toString();

            if (Math.random() > 0.5)
                cache.put(key, new Person(PERSON_NAMES[0], randomGenerator.nextInt()));
            else
                cache.put(key, new Person(PERSON_NAMES[1], randomGenerator.nextInt()));
        }
    }

    public static class Person {
        @QuerySqlField
        private String name;
        @QuerySqlField
        private int value;

        public Person(String name, int value) {
            this.name = name;
            this.value = value;
        }
    }

    private static class MyTask implements IgniteRunnable {
        @IgniteInstanceResource
        Ignite ignite;

        @Override public void run() {
            UUID locNodeId = ignite.cluster().localNode().id();

            SqlFieldsQuery sql = new SqlFieldsQuery(
                "select name,count(name) from Person group by name;");

            List<List<?>> result = ignite.cache(CACHE_NAME).query(sql).getAll();

            assert result.size() <= PERSON_NAMES.length;

            long itemsCnt = 0;

            for (int i = 0; i < result.size(); i++) {
                System.out.println(locNodeId + ": " + result.get(i).get(0) + " : " + result.get(i).get(1));

                assert result.get(i).get(1) instanceof Long;

                itemsCnt += (Long)result.get(i).get(1);
            }

            assert itemsCnt == ENTRIES_COUNT;
        }
    }
}
