--code is as below

--all caches are partitioned

--args[0] is type of query, args[1] is number of queries, args[2] is ip of
cluster group (each node is a separate cluster group. group name is
'i-<ipaddress>'

--we are trying with different args[2] (cluster groups) and we are getting
duplicate result. Web console shows that a particular id is in only in one
node (hence data partition and collocation is working fine(






import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;



public class GetItemsWithClusterGroup {

        public static void main(String args[]) {
                //args[0] is type of query (0,1,2), args[1] is number of 
queries, args[2]
is ip of cluster group
                //we are trying with different args[2] and we are getting 
duplicate result
                int queryType = Integer.parseInt(args[0]);
                int maxNumberOfQueries = Integer.parseInt(args[1]);
                Ignition.setClientMode(true);
                IgniteConfiguration conf = new IgniteConfiguration();
                conf.setPeerClassLoadingEnabled(true);
                TcpDiscoverySpi discovery = new TcpDiscoverySpi();
                TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
                String ipaddress = args[2];
                ipFinder.setAddresses(Arrays.asList(ipaddress));
                discovery.setIpFinder(ipFinder);
                conf.setDiscoverySpi(discovery);
                Ignite ignite = Ignition.start(conf);
                System.out.println("IP=> " + ipaddress);
                IgniteCluster cluster = ignite.cluster();
                List<String> groups = Arrays.asList("i-"+ipaddress);
                List<IgniteCompute> computes = new ArrayList<>();
                IgniteCache<Integer, Item> itemCache =
cluster.ignite().getOrCreateCache("ITEMCACHE");
                for (String group : groups) {
                        ClusterGroup cg = cluster.forAttribute("ROLE", group);
                        System.out.println(group + " " + cg.nodes());
                        
computes.add(ignite.compute(cg).withAsync().withTimeout(5000));
                }

                int numberOfQueries = 0;

                while (numberOfQueries < maxNumberOfQueries) {
                        numberOfQueries++;
                        process(ignite, computes, itemCache, queryType);
                }

        }

        private static void process(Ignite ignite, List<IgniteCompute> computes,
IgniteCache<Integer, Item> cache,
                        int queryType) {

                List<IgniteFuture&lt;List&lt;String>>> futures = new 
ArrayList<>();

                String geoId = "144";
                StringBuilder buff = new StringBuilder();
                final List args = new ArrayList<>();
                
                if (queryType == 0) {
                        buff = new StringBuilder("SELECT T.uniqueSkuId FROM " + 
"ITEMCACHE.Item
as T LIMIT 2");
                }

                if (queryType == 1) {
                        buff = new StringBuilder(
                                        "SELECT  r.id FROM IGPCACHE.Rank as r " 
+ " WHERE  r.geoId=? " + "
order by r.rank desc limit 3");
                        args.add(geoId);

                }

                if (queryType == 2) {
                        buff = new StringBuilder("SELECT  r.id FROM 
IGPCACHE.Rank as r " + "
WHERE  r.geoId=? " + " limit 3");
                        args.add(geoId);

                }

                if (queryType == 3) {
                        buff = new StringBuilder("SELECT  r.id FROM 
IGPCACHE.Rank as r " + "
limit 3");

                }

                SqlFieldsQuery qry = new SqlFieldsQuery(buff.toString());
                qry.setEnforceJoinOrder(true);
                for (IgniteCompute async : computes) {
                        async.call(new IgniteCallable<List&lt;String>>() {
                                @Override
                                public List<String> call() throws Exception {
                                        try {
                                                List<List&lt;?>> res =
cache.query(qry.setArgs(args.toArray()).setLocal(true)).getAll();
                                                List<String> items = 
convert(res);
                                                return items;
                                        } catch (Exception e) {
                                                System.out.println("exceptionx: 
" + e.getMessage());
                                                e.printStackTrace();
                                        }

                                        return null;

                                }

                                private List<String> convert(List<List&lt;?>> 
res) {
                                        List<String> items = new ArrayList<>();
                                        if (res != null) {
                                                for (List<?> list : res) {
                                                        
items.addAll((Collection<? extends String>) list);
                                                }
                                        }
                                        return items;
                                }
                        });

                        IgniteFuture<List&lt;String>> future = async.future();
                        futures.add(future);
                }

                List<String> list = new ArrayList<>();

                for (IgniteFuture<List&lt;String>> future : futures) {
                        try {
                                List<String> returnList = future.get(500, 
TimeUnit.MILLISECONDS);
                                // System.out.println(" returning list 
"+returnList);
                                if (returnList != null)
                                        list.addAll(returnList);
                        } catch (Exception e) {
                                System.out.println("got exception1 " + 
e.getMessage());
                                e.printStackTrace();
                        }
                }
                //displaying output list
                System.out.println(list.size() + "#" + list);
        }

}



--
View this message in context: 
http://apache-ignite-users.70518.x6.nabble.com/sql-query-in-case-of-cluster-group-tp14884p14891.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Reply via email to