Hi!

You perform cache operation from filter. The callback is invoked from
sensitive part of code and in synchronous mode it can lead to thread
starvation and deadlock (see ContinuousQuery#setRemoteFilterFactory
javadoc). @IgniteAsyncCallback was designed exactly for resolving this
issue.

> but it can not guarantee event to be processed in order
@IgniteAsyncCallback have the same guarantee. Updates for key will be performed
sequentially.

Thanks,
Nikolay

On Mon, Feb 20, 2017 at 11:38 AM, ght230 <[email protected]> wrote:

> I found Ignite cluster unstable when doing continuous query.
> Following is my test code.
>
> ContinuousQuery.java
> public class ContinuousQuery {
>         /** Cache name. */
>         private static final String CACHE_NAME_INPUTDATA = "inputdata";
>         private static final String CACHE_NAME_UPDATEDATA = "updatedata";
>         private static IgniteCache inputCache = null;
>         private static IgniteCache updateCache = null;
>     static Random rand=new Random();
>
>         public static void main(String[] args) throws Exception {
>                 Ignition.setClientMode(true);
>                 Ignite ignite = Ignition.start();
>
>                 CacheConfiguration cacheCfg = new CacheConfiguration();
>                 cacheCfg.setCacheMode(CacheMode.PARTITIONED);
>                 cacheCfg.setAtomicityMode(CacheAtomicityMode.
> TRANSACTIONAL);
>                 cacheCfg.setName(CACHE_NAME_INPUTDATA);
>                 inputCache = ignite.getOrCreateCache(cacheCfg);
>
>                 CacheConfiguration cacheCfg1 = new CacheConfiguration();
>                 cacheCfg1.setCacheMode(CacheMode.REPLICATED);
>                 cacheCfg1.setAtomicityMode(CacheAtomicityMode.
> TRANSACTIONAL);
>                 cacheCfg1.setName(CACHE_NAME_UPDATEDATA);
>                 updateCache = ignite.getOrCreateCache(cacheCfg1);
>
>                 ContinuousQuery<Integer, String> qry = new
> ContinuousQuery<>();
>
>                 qry.setLocalListener(new CacheEntryUpdatedListener<Integer,
> String>() {
>                         @Override
>                         public void onUpdated(Iterable<CacheEntryEvent&lt;?
> extends Integer, ?
> extends String>> evts) {
>                         }
>                 });
>
>                 qry.setRemoteFilterFactory(new
> Factory<CacheEntryEventFilter&lt;Integer,
> String>>() {
>             @Override public CacheEntryEventFilter<Integer, String>
> create()
> {
>                 return new CacheEntryFilter();
>             }
>                 });
>                 inputCache.query(qry);
>         }
>
>     private static class CacheEntryFilter implements
> CacheEntryEventFilter<Integer, String> {
>         /** Ignite instance. */
>         @IgniteInstanceResource
>         private Ignite ignite;
>
>         /** {@inheritDoc} */
>         @Override public boolean evaluate(final CacheEntryEvent<? extends
> Integer, ? extends String> e)
>             throws CacheEntryListenerException {
>                 IgniteCache<Integer, String> syncCache =
> ignite.cache(CACHE_NAME_UPDATEDATA);
>                 if (e.getEventType() == EventType.CREATED && e.getKey() <
> 100000 &&
> e.getKey() % 5000 != 0){
>
> System.out.println(Thread.currentThread().getName()+"***
> --->><key,value>=<"+e.getKey()+","+e.getValue()+">"
> );
>                     syncCache.put(e.getKey(),
> e.getValue()+"_______"+System.currentTimeMillis());
>                     syncCache.remove(e.getKey());
>                                 return false;
>                 }
>                         return true;
>         }
>     }
> }
>
> InputData.java
> public class InputData {
>         /** Cache name. */
>         private static final String CACHE_NAME_INPUTDATA = "inputdata";
>         private static IgniteCache inputCache;
>
>         public static void main(String[] args) throws Exception {
>                 Ignition.setClientMode(true);
>                 Ignite ignite = Ignition.start();
>
>                 inputCache = ignite.getOrCreateCache(CACHE_
> NAME_INPUTDATA);
>
>                 // Auto-close cache at the end of the example.
>                 int keyCnt = 1000000;
>
>                 // These entries will be queried by initial predicate.
>                 for (int i = 0; i < keyCnt; i++){
>                         inputCache.put(i, Integer.toString(10000 + i));
>                 }
>         }
> }
>
> My test steps as following:
> step 1:start an Ignite server.
> step 2: start an Ignite client of ContinuousQuery.jar
> step 3: start an Ignite client of InputData.jar
>
> Then If I start another Ignite server or even start ignitevisorcmd, the
> Ignite cluster will be jammed.
>
> If I add @IgniteAsyncCallback to CacheEntryFilter, the cluster will not be
> jammed, but it can not guarantee event to be processed in order.
>
> I would like to know what solution can meet my follwong needs?
> 1. The event should be triggered in order.
> 2. The process in function "evaluate" should be in order.(in the example,
> "put" and "remove" operation should be in order)
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Ignite-cluster-unstable-when-doing-
> continuous-query-tp10726.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Reply via email to