--code is as below
--all caches are partitioned
--args[0] is type of query, args[1] is number of queries, args[2] is ip of
cluster group (each node is a separate cluster group. group name is
'i-<ipaddress>'
--we are trying with different args[2] (cluster groups) and we are getting
duplicate result. Web console shows that a particular id is in only in one
node (hence data partition and collocation is working fine(
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
public class GetItemsWithClusterGroup {
public static void main(String args[]) {
//args[0] is type of query (0,1,2), args[1] is number of
queries, args[2]
is ip of cluster group
//we are trying with different args[2] and we are getting
duplicate result
int queryType = Integer.parseInt(args[0]);
int maxNumberOfQueries = Integer.parseInt(args[1]);
Ignition.setClientMode(true);
IgniteConfiguration conf = new IgniteConfiguration();
conf.setPeerClassLoadingEnabled(true);
TcpDiscoverySpi discovery = new TcpDiscoverySpi();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
String ipaddress = args[2];
ipFinder.setAddresses(Arrays.asList(ipaddress));
discovery.setIpFinder(ipFinder);
conf.setDiscoverySpi(discovery);
Ignite ignite = Ignition.start(conf);
System.out.println("IP=> " + ipaddress);
IgniteCluster cluster = ignite.cluster();
List<String> groups = Arrays.asList("i-"+ipaddress);
List<IgniteCompute> computes = new ArrayList<>();
IgniteCache<Integer, Item> itemCache =
cluster.ignite().getOrCreateCache("ITEMCACHE");
for (String group : groups) {
ClusterGroup cg = cluster.forAttribute("ROLE", group);
System.out.println(group + " " + cg.nodes());
computes.add(ignite.compute(cg).withAsync().withTimeout(5000));
}
int numberOfQueries = 0;
while (numberOfQueries < maxNumberOfQueries) {
numberOfQueries++;
process(ignite, computes, itemCache, queryType);
}
}
private static void process(Ignite ignite, List<IgniteCompute> computes,
IgniteCache<Integer, Item> cache,
int queryType) {
List<IgniteFuture<List<String>>> futures = new
ArrayList<>();
String geoId = "144";
StringBuilder buff = new StringBuilder();
final List args = new ArrayList<>();
if (queryType == 0) {
buff = new StringBuilder("SELECT T.uniqueSkuId FROM " +
"ITEMCACHE.Item
as T LIMIT 2");
}
if (queryType == 1) {
buff = new StringBuilder(
"SELECT r.id FROM IGPCACHE.Rank as r "
+ " WHERE r.geoId=? " + "
order by r.rank desc limit 3");
args.add(geoId);
}
if (queryType == 2) {
buff = new StringBuilder("SELECT r.id FROM
IGPCACHE.Rank as r " + "
WHERE r.geoId=? " + " limit 3");
args.add(geoId);
}
if (queryType == 3) {
buff = new StringBuilder("SELECT r.id FROM
IGPCACHE.Rank as r " + "
limit 3");
}
SqlFieldsQuery qry = new SqlFieldsQuery(buff.toString());
qry.setEnforceJoinOrder(true);
for (IgniteCompute async : computes) {
async.call(new IgniteCallable<List<String>>() {
@Override
public List<String> call() throws Exception {
try {
List<List<?>> res =
cache.query(qry.setArgs(args.toArray()).setLocal(true)).getAll();
List<String> items =
convert(res);
return items;
} catch (Exception e) {
System.out.println("exceptionx:
" + e.getMessage());
e.printStackTrace();
}
return null;
}
private List<String> convert(List<List<?>>
res) {
List<String> items = new ArrayList<>();
if (res != null) {
for (List<?> list : res) {
items.addAll((Collection<? extends String>) list);
}
}
return items;
}
});
IgniteFuture<List<String>> future = async.future();
futures.add(future);
}
List<String> list = new ArrayList<>();
for (IgniteFuture<List<String>> future : futures) {
try {
List<String> returnList = future.get(500,
TimeUnit.MILLISECONDS);
// System.out.println(" returning list
"+returnList);
if (returnList != null)
list.addAll(returnList);
} catch (Exception e) {
System.out.println("got exception1 " +
e.getMessage());
e.printStackTrace();
}
}
//displaying output list
System.out.println(list.size() + "#" + list);
}
}
--
View this message in context:
http://apache-ignite-users.70518.x6.nabble.com/sql-query-in-case-of-cluster-group-tp14884p14891.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.