Hi! You perform cache operation from filter. The callback is invoked from sensitive part of code and in synchronous mode it can lead to thread starvation and deadlock (see ContinuousQuery#setRemoteFilterFactory javadoc). @IgniteAsyncCallback was designed exactly for resolving this issue.
> but it can not guarantee event to be processed in order @IgniteAsyncCallback have the same guarantee. Updates for key will be performed sequentially. Thanks, Nikolay On Mon, Feb 20, 2017 at 11:38 AM, ght230 <[email protected]> wrote: > I found Ignite cluster unstable when doing continuous query. > Following is my test code. > > ContinuousQuery.java > public class ContinuousQuery { > /** Cache name. */ > private static final String CACHE_NAME_INPUTDATA = "inputdata"; > private static final String CACHE_NAME_UPDATEDATA = "updatedata"; > private static IgniteCache inputCache = null; > private static IgniteCache updateCache = null; > static Random rand=new Random(); > > public static void main(String[] args) throws Exception { > Ignition.setClientMode(true); > Ignite ignite = Ignition.start(); > > CacheConfiguration cacheCfg = new CacheConfiguration(); > cacheCfg.setCacheMode(CacheMode.PARTITIONED); > cacheCfg.setAtomicityMode(CacheAtomicityMode. > TRANSACTIONAL); > cacheCfg.setName(CACHE_NAME_INPUTDATA); > inputCache = ignite.getOrCreateCache(cacheCfg); > > CacheConfiguration cacheCfg1 = new CacheConfiguration(); > cacheCfg1.setCacheMode(CacheMode.REPLICATED); > cacheCfg1.setAtomicityMode(CacheAtomicityMode. > TRANSACTIONAL); > cacheCfg1.setName(CACHE_NAME_UPDATEDATA); > updateCache = ignite.getOrCreateCache(cacheCfg1); > > ContinuousQuery<Integer, String> qry = new > ContinuousQuery<>(); > > qry.setLocalListener(new CacheEntryUpdatedListener<Integer, > String>() { > @Override > public void onUpdated(Iterable<CacheEntryEvent<? > extends Integer, ? > extends String>> evts) { > } > }); > > qry.setRemoteFilterFactory(new > Factory<CacheEntryEventFilter<Integer, > String>>() { > @Override public CacheEntryEventFilter<Integer, String> > create() > { > return new CacheEntryFilter(); > } > }); > inputCache.query(qry); > } > > private static class CacheEntryFilter implements > CacheEntryEventFilter<Integer, String> { > /** Ignite instance. */ > @IgniteInstanceResource > private Ignite ignite; > > /** {@inheritDoc} */ > @Override public boolean evaluate(final CacheEntryEvent<? extends > Integer, ? extends String> e) > throws CacheEntryListenerException { > IgniteCache<Integer, String> syncCache = > ignite.cache(CACHE_NAME_UPDATEDATA); > if (e.getEventType() == EventType.CREATED && e.getKey() < > 100000 && > e.getKey() % 5000 != 0){ > > System.out.println(Thread.currentThread().getName()+"*** > --->><key,value>=<"+e.getKey()+","+e.getValue()+">" > ); > syncCache.put(e.getKey(), > e.getValue()+"_______"+System.currentTimeMillis()); > syncCache.remove(e.getKey()); > return false; > } > return true; > } > } > } > > InputData.java > public class InputData { > /** Cache name. */ > private static final String CACHE_NAME_INPUTDATA = "inputdata"; > private static IgniteCache inputCache; > > public static void main(String[] args) throws Exception { > Ignition.setClientMode(true); > Ignite ignite = Ignition.start(); > > inputCache = ignite.getOrCreateCache(CACHE_ > NAME_INPUTDATA); > > // Auto-close cache at the end of the example. > int keyCnt = 1000000; > > // These entries will be queried by initial predicate. > for (int i = 0; i < keyCnt; i++){ > inputCache.put(i, Integer.toString(10000 + i)); > } > } > } > > My test steps as following: > step 1:start an Ignite server. > step 2: start an Ignite client of ContinuousQuery.jar > step 3: start an Ignite client of InputData.jar > > Then If I start another Ignite server or even start ignitevisorcmd, the > Ignite cluster will be jammed. > > If I add @IgniteAsyncCallback to CacheEntryFilter, the cluster will not be > jammed, but it can not guarantee event to be processed in order. > > I would like to know what solution can meet my follwong needs? > 1. The event should be triggered in order. > 2. The process in function "evaluate" should be in order.(in the example, > "put" and "remove" operation should be in order) > > > > -- > View this message in context: http://apache-ignite-users. > 70518.x6.nabble.com/Ignite-cluster-unstable-when-doing- > continuous-query-tp10726.html > Sent from the Apache Ignite Users mailing list archive at Nabble.com. >
