Hi KJQ,

Thank you for sharing details. Now things are more clear for me.

I suppose you should enable needed event types. You can call
setIncludeEventTypes(EventType.EVT_CACHE_OBJECT_PUT,
EventType.EVT_CACHE_OBJECT_EXPIRED) on your IgniteConfiguration. Also
I noticed that you use ig.compute() on Ignite instance (ig) local to
your calling code. But actually you need to inject actual Ignite
instance where event listener is called, it can be done by using
@IgniteInstanceResource. You can find such injection in
ComputeFibonacciContinuationExample in Ignite sources.

вт, 1 окт. 2019 г. в 18:24, KJQ <[email protected]>:
>
> Thank you for the quick response - it is much appreciated.  We are still
> working through our Ignite integration, so far it has been nice, but it is
> definitely a learning curve.
>
> So, we have an environment that is deployed as services in a K8S cluster.
> Each service uses Caffeine as the in-memory cache (i know Ignite has a
> near-cache but cannot make that change now).  Caffeine is also being tested
> inside of Reactive Flux/Mono calls making it harder to change quickly.  Each
> service, a deployed pod, is also an Ignite "server" node as well.  We use
> Ignite, partitioned, as the primary cache (and some distributed compute with
> Drools).
>
> Because all of the nodes use Caffeine, and becoming just a near-cache when
> Ignite is included, we would like Ignite to raise an "expired" event to all
> the nodes and evict that item from Caffeine (before Ignite, Caffeine was
> used as the only in-memory cache per service) - we want to cleanup the local
> caches on all the nodes.  Each Ignite cache configuration has an expiration
> policy setup.
>
> 1) I tried using the `addCacheEntryListenerConfiguration` with each Ignite
> cache thinking this was a better choice because (i thought) it was backed by
> the continuous query and would not require me to explicitly use events.
> But, it looked like that only fired on the node where the operation happened
> (i.e. locally).  Maybe I could broadcast the event from within this listener
> to the other nodes?
>
> 2) My next attempt is using the "remoteListen()."  If I understand you
> correctly, I do not need a "local listener" but the "remote listener" should
> broadcast a message when it is triggered?
>
> *Couple of things I noticed in my test below:*
> - If i take out the PUT it seems I never see any callback notifications
> - Without the local event listener I do not see any expiration messages
> (possibly because of where the data is being cached in the test - local vs.
> remote node)
>
> Basically, I would like to do this:
> 1) R/W to Ignite cache with an "expiration" policy
> 2) When Ignite decides to "expire" an item raise an event to all Ignite
> nodes
> 3) From the event, evict the cache item locally.
>
> This is what I have right now for testing:
>
> ig.events(
>     ig.cluster().forServers())
>     .remoteListen(
>           new IgniteBiPredicate<UUID, CacheEvent>()
>           {
>                     @Override
>                     public boolean apply(UUID uuid, CacheEvent evt)
>                     {
>                         log.debug("Received local event {} {} {}  // {} //
> {} ",
>                                   evt.cacheName(),
>                                   evt.name(),
>                                   evt.key(),
>                                   evt.node().id(),
>                                   evt.eventNode().id());
>                         return true; // Continue listening.
>           },
>         new IgnitePredicate<CacheEvent>()
>         {
>             @Override
>             public boolean apply(final CacheEvent evt)
>             {
>                 log.debug("Received remote event {} {} {}  / {} {} ",
>                           evt.cacheName(),
>                           evt.name(),
>                           evt.key(),
>                           evt.node().id(),
>                           evt.eventNode().id());
>
>                 //Broadcast the callback
>                 ig.compute().broadcastAsync(new IgniteCallable()
>                 {
>                     @Override
>                     public Object call() throws Exception
>                     {
>                         log.debug("Received callback {} {} {}  / {} {} ",
>                                  evt.cacheName(),
>                                  evt.name(),
>                                  evt.key(),
>                                  evt.node().id(),
>                                  evt.eventNode().id());
>                         return null;
>                     }
>                 });
>
>                 return true;
>             }
>         },
>         EVT_CACHE_OBJECT_PUT,
>         EVT_CACHE_OBJECT_EXPIRED)
>
>
>
>
>
>
> -----
> KJQ
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/



-- 
Best regards,
Ivan Pavlukhin

Reply via email to