Today I do a test about Async callbacks.
My source cade as following:
ContinuousQuery.cpp
public class ContinuousQuery {
/** Cache name. */
private static final String CACHE_NAME_INPUTDATA = "inputdata";
private static IgniteCache inputCache = null;
static Random rand=new Random();
public static void main(String[] args) throws Exception {
Ignition.setClientMode(true);
Ignite ignite = Ignition.start();
System.out.println();
System.out.println(">>> Cache continuous query example1
started.");
inputCache = ignite.getOrCreateCache(CACHE_NAME_INPUTDATA);
ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
qry.setLocalListener(new CacheEntryUpdatedListener<Integer,
String>() {
@Override
public void onUpdated(Iterable<CacheEntryEvent<?
extends Integer, ?
extends String>> evts) {
}
});
qry.setRemoteFilterFactory(new
Factory<CacheEntryEventFilter<Integer,
String>>() {
@Override
public CacheEntryEventFilter<Integer, String> create() {
return new CacheEntryEventFilter<Integer,
String>() {
@Override
public boolean
evaluate(CacheEntryEvent<? extends Integer, ? extends
String> e) {
if (e.getEventType() ==
EventType.CREATED) {
System.out.println(Thread.currentThread().getName()+"***--->><key,value>=<"+e.getKey()+","+e.getValue()+">"
);
try {
Thread.sleep(rand.nextInt(1000));
} catch
(InterruptedException e1) {
e1.printStackTrace();
}
}
return false;
}
};
}
});
inputCache.query(qry);
}
}
Inputdata.cpp
public class InputData5 {
private static final String CACHE_NAME_INPUTDATA = "inputdata";
private static IgniteCache inputCache;
public static void main(String[] args) throws Exception {
Ignition.setClientMode(true);
Ignite ignite = Ignition.start();
CacheConfiguration cacheCfg = new CacheConfiguration();
cacheCfg.setCacheMode(CacheMode.REPLICATED);
cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
cacheCfg.setName(CACHE_NAME_INPUTDATA);
System.out.println();
System.out.println(">>> input data started.");
inputCache = ignite.getOrCreateCache(cacheCfg);
int keyCnt = 900000;
for (int i = 0; i < keyCnt; i++){
inputCache.put(i, Integer.toString(10000 + i));
}
}
}
I start an Ignite Server first, then start ContinuousQuery.cpp, finally
start inputdata.cpp.
If I do not add "@IgniteAsyncCallback" before CacheEntryEventFilter , the
output result is in order, such:
callback-#414%null%----->><key,value>=<0,10000>
callback-#432%null%----->><key,value>=<1,10001>
callback-#430%null%----->><key,value>=<2,10002>
callback-#428%null%----->><key,value>=<3,10003>
callback-#426%null%----->><key,value>=<4,10004>
callback-#424%null%----->><key,value>=<5,10005>
callback-#422%null%----->><key,value>=<6,10006>
callback-#420%null%----->><key,value>=<7,10007>
callback-#418%null%----->><key,value>=<8,10008>
callback-#416%null%----->><key,value>=<9,10009>
But if I add "@IgniteAsyncCallback" before CacheEntryEventFilter, the output
result is not in order, such as:
callback-#414%null%----->><key,value>=<0,10000>
callback-#432%null%----->><key,value>=<9,10009>
callback-#430%null%----->><key,value>=<8,10008>
callback-#428%null%----->><key,value>=<7,10007>
callback-#426%null%----->><key,value>=<6,10006>
callback-#424%null%----->><key,value>=<5,10005>
callback-#422%null%----->><key,value>=<4,10004>
callback-#420%null%----->><key,value>=<3,10003>
callback-#418%null%----->><key,value>=<2,10002>
callback-#416%null%----->><key,value>=<1,10001>
The order of events is important to me, is there anything wrong about my
test?
--
View this message in context:
http://apache-ignite-users.70518.x6.nabble.com/Can-continuousAsyncQuery-guarantee-event-to-be-processed-in-order-tp10521p10596.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.