Hi,

What do you think about an atomic counter of not successful trades, e.g.
the atomic cache with a company name as a key and Integer as a counter.
You can update the counter when add trade (increment) and at the listener
of the continuous query (decrement) and do something when counter equals
zero.

-- 

Taras Ledkov
Mail-To: [email protected]


On Wed, Dec 14, 2016 at 2:02 PM, begineer <[email protected]> wrote:

> Hi,
> My sample application processes trades for different companies stored in
> Ignite cache. When all trades for particular company reaches SUCCESS stage,
> an automatic notification should be triggered and some other
> system/application will react to it. To do this, When ever any trade
> reaches
> SUCCESS stage, I detect it using Continuous query, I am comparing total
> size
> of trades for particular company in cache and the trades for that company
> which are in SUCCESS stage. If they are equal, trigger a notification else
> wait.
> There are two drawbacks with this approach.
> 1. I have to query through all the trades for that company twice every time
> trade reach SUCCESS stage which is really really bad.
> 2. notification can be triggered multiple times if multiple threads
> processing trades in parallel in SUCCESS stage.(i.e lets say last 5 items
> reach SUCCESS stage together, so 5 emails will be sent since continuous
> query will run local listener 5 times)
>
> Is there a better way to do same task. I am hoping it is. My current
> approach cannot work in real time application.
>
> Below is my code.
>
>
>
> public class TerminalEventsUsingContQuery {
>         IgniteCache<Integer, Trade> cache;
>
>         public static void main(String[] args) {
>                 new TerminalEventsUsingContQuery().test();
>         }
>
>         private void test() {
>
>                 Ignite ignite = Ignition.start("examples/
> config/example-ignite.xml");
>                 CacheConfiguration<Integer, Trade> config = new
> CacheConfiguration<>("TradesCache");
>
>                 cache = ignite.getOrCreateCache(config);
>                 ContinuousQuery<Integer, Trade> query = new
> ContinuousQuery<>();
>                 query.setLocalListener(events -> events.forEach(e ->
> process(e.getValue())));
>
>                 query.setRemoteFilterFactory(factoryOf(e ->
> TradeStatus.SUCCESS.equals(e.getValue().getStatus())));
>                 query.setInitialQuery(new ScanQuery<Integer, Trade>((k, v)
> ->
> TradeStatus.SUCCESS.equals(v.getStatus())));
>                 buildData();
>                 QueryCursor<Entry&lt;Integer, Trade>> cursor =
> cache.query(query);
>                 cursor.forEach(entry -> process(entry.getValue()));
>                 Trade t9 = new Trade(9, TradeStatus.SUCCESS, "type1", 100);
>                 cache.put(t9.getId(), t9);
>         }
>
>         private void process(Trade trade) {
>                 List<Entry&lt;Integer, Trade>> totalperRef = cache
>                                 .query(new ScanQuery<Integer, Trade>((k,
> v) -> v.getRef() ==
> trade.getRef())).getAll();
>
>                 List<Entry&lt;Integer, Trade>> totalSuccessForRef =
> cache.query(new
> ScanQuery<Integer, Trade>(
>                                 (k, v) -> v.getRef() == trade.getRef() &&
> TradeStatus.SUCCESS.equals(v.getStatus()))).getAll();
>
>                 if (totalperRef.size() == totalSuccessForRef.size()) {
>                         System.out.println("Terminal condition reached.
> Notify the handler for :
> " + trade.getRef());
>                 } else {
>                         System.out.println("Terminal condition not reached
> yet. Current
> processing Trade : " + trade.getId());
>                 }
>         }
>
>         private void buildData() {
>                 Trade t1 = new Trade(1, TradeStatus.SUCCESS, "type1", 100);
>                 Trade t2 = new Trade(2, TradeStatus.FAILED, "type1", 101);
>                 Trade t3 = new Trade(3, TradeStatus.EXPIRED, "type1", 102);
>                 Trade t4 = new Trade(4, TradeStatus.SUCCESS, "type1", 100);
>                 Trade t5 = new Trade(5, TradeStatus.CHANGED, "type1", 103);
>                 Trade t6 = new Trade(6, TradeStatus.SUCCESS, "type1", 100);
>                 Trade t7 = new Trade(7, TradeStatus.CHANGED, "type1", 103);
>                 Trade t8 = new Trade(8, TradeStatus.SUCCESS, "type1", 101);
>                 cache.put(t1.getId(), t1);
>                 cache.put(t2.getId(), t2);
>                 cache.put(t3.getId(), t3);
>                 cache.put(t4.getId(), t4);
>                 cache.put(t5.getId(), t5);
>                 cache.put(t6.getId(), t6);
>                 cache.put(t7.getId(), t7);
>                 cache.put(t8.getId(), t8);
>         }
> }
>
> public class Trade {
>         private int id;
>         private TradeStatus status;
>         private String tradeType;
>         public Trade(int id, TradeStatus status, String tradeType) {
>                 this.id = id;
>                 this.status = status;
>                 this.tradeType = tradeType;
>         }
>
> //setter getter, equals, hashcode methods
> }
>
> public enum TradeStatus {
>         NEW, CHANGED, EXPIRED, FAILED, UNCHANGED , SUCCESS
> }
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Detecting-terminal-condition-for-group-
> of-items-in-Ignite-cache-tp9526.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Reply via email to