Hi, What do you think about an atomic counter of not successful trades, e.g. the atomic cache with a company name as a key and Integer as a counter. You can update the counter when add trade (increment) and at the listener of the continuous query (decrement) and do something when counter equals zero.
-- Taras Ledkov Mail-To: [email protected] On Wed, Dec 14, 2016 at 2:02 PM, begineer <[email protected]> wrote: > Hi, > My sample application processes trades for different companies stored in > Ignite cache. When all trades for particular company reaches SUCCESS stage, > an automatic notification should be triggered and some other > system/application will react to it. To do this, When ever any trade > reaches > SUCCESS stage, I detect it using Continuous query, I am comparing total > size > of trades for particular company in cache and the trades for that company > which are in SUCCESS stage. If they are equal, trigger a notification else > wait. > There are two drawbacks with this approach. > 1. I have to query through all the trades for that company twice every time > trade reach SUCCESS stage which is really really bad. > 2. notification can be triggered multiple times if multiple threads > processing trades in parallel in SUCCESS stage.(i.e lets say last 5 items > reach SUCCESS stage together, so 5 emails will be sent since continuous > query will run local listener 5 times) > > Is there a better way to do same task. I am hoping it is. My current > approach cannot work in real time application. > > Below is my code. > > > > public class TerminalEventsUsingContQuery { > IgniteCache<Integer, Trade> cache; > > public static void main(String[] args) { > new TerminalEventsUsingContQuery().test(); > } > > private void test() { > > Ignite ignite = Ignition.start("examples/ > config/example-ignite.xml"); > CacheConfiguration<Integer, Trade> config = new > CacheConfiguration<>("TradesCache"); > > cache = ignite.getOrCreateCache(config); > ContinuousQuery<Integer, Trade> query = new > ContinuousQuery<>(); > query.setLocalListener(events -> events.forEach(e -> > process(e.getValue()))); > > query.setRemoteFilterFactory(factoryOf(e -> > TradeStatus.SUCCESS.equals(e.getValue().getStatus()))); > query.setInitialQuery(new ScanQuery<Integer, Trade>((k, v) > -> > TradeStatus.SUCCESS.equals(v.getStatus()))); > buildData(); > QueryCursor<Entry<Integer, Trade>> cursor = > cache.query(query); > cursor.forEach(entry -> process(entry.getValue())); > Trade t9 = new Trade(9, TradeStatus.SUCCESS, "type1", 100); > cache.put(t9.getId(), t9); > } > > private void process(Trade trade) { > List<Entry<Integer, Trade>> totalperRef = cache > .query(new ScanQuery<Integer, Trade>((k, > v) -> v.getRef() == > trade.getRef())).getAll(); > > List<Entry<Integer, Trade>> totalSuccessForRef = > cache.query(new > ScanQuery<Integer, Trade>( > (k, v) -> v.getRef() == trade.getRef() && > TradeStatus.SUCCESS.equals(v.getStatus()))).getAll(); > > if (totalperRef.size() == totalSuccessForRef.size()) { > System.out.println("Terminal condition reached. > Notify the handler for : > " + trade.getRef()); > } else { > System.out.println("Terminal condition not reached > yet. Current > processing Trade : " + trade.getId()); > } > } > > private void buildData() { > Trade t1 = new Trade(1, TradeStatus.SUCCESS, "type1", 100); > Trade t2 = new Trade(2, TradeStatus.FAILED, "type1", 101); > Trade t3 = new Trade(3, TradeStatus.EXPIRED, "type1", 102); > Trade t4 = new Trade(4, TradeStatus.SUCCESS, "type1", 100); > Trade t5 = new Trade(5, TradeStatus.CHANGED, "type1", 103); > Trade t6 = new Trade(6, TradeStatus.SUCCESS, "type1", 100); > Trade t7 = new Trade(7, TradeStatus.CHANGED, "type1", 103); > Trade t8 = new Trade(8, TradeStatus.SUCCESS, "type1", 101); > cache.put(t1.getId(), t1); > cache.put(t2.getId(), t2); > cache.put(t3.getId(), t3); > cache.put(t4.getId(), t4); > cache.put(t5.getId(), t5); > cache.put(t6.getId(), t6); > cache.put(t7.getId(), t7); > cache.put(t8.getId(), t8); > } > } > > public class Trade { > private int id; > private TradeStatus status; > private String tradeType; > public Trade(int id, TradeStatus status, String tradeType) { > this.id = id; > this.status = status; > this.tradeType = tradeType; > } > > //setter getter, equals, hashcode methods > } > > public enum TradeStatus { > NEW, CHANGED, EXPIRED, FAILED, UNCHANGED , SUCCESS > } > > > > -- > View this message in context: http://apache-ignite-users. > 70518.x6.nabble.com/Detecting-terminal-condition-for-group- > of-items-in-Ignite-cache-tp9526.html > Sent from the Apache Ignite Users mailing list archive at Nabble.com. >
