Remote Listener is deployed on every cache node and is invoked only on a
primary node for that key.
In other words, for each key there is only one invocation of the remote
filter, and that invocation is local to that key.

So you can place your processing logic into the Remote Filter.

On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson <[email protected]
> wrote:

> Hi Pavel,
>
>
>
> Yes, I looked at continuous queries. They appear to be oriented toward a
> single context being sent the newly arrived elements in the cache from all
> primary nodes hosting the cache involved in the query.
>
>
>
> In the use case I outlined below, I would like to have the items processed
> in co-located contexts (ie: the data does not move and is processed in situ
> on the primary node). How do you do that with a continuous query?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Pavel Tupitsyn [mailto:[email protected]]
> *Sent:* Monday, April 23, 2018 7:18 PM
> *To:* [email protected]
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> Hi Raymond,
>
>
>
> To process incoming data in a co-located fashion there is a Continuous
> Query feature [1].
>
> Looks like it fits your use case quite well.
>
>
>
>
>
> [1] https://apacheignite-net.readme.io/docs/continuous-queries
>
>
>
> On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <
> [email protected]> wrote:
>
> I did find ICache.GetLocalEntries() method and have written the following
> as a proof of concept (yet to exercise it though):
>
>
>
>             IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});
>
>
>
>             ICacheEntry<BufferQueueKey, BufferQueueItem> first =
> localItems.FirstOrDefault();
>
>
>
>             if (first != null)
>
>             {
>
>                 // Get the list of all items in the buffer matching the
> affinity key of the first item
>
>                 // in the list, limiting the result set to 100 TAG files.
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
>                 if (candidates?.Count > 0)
>
>                 {
>
>                     // Submit the list of items to the processor
>
>                     // ...
>
>                 }
>
>             }
>
>
>
> This seems like it should do what I want, but I’m a little suspicious that
> it may evaluate the entire content of the cache against the Where()
> condition before taking the first 100 results.
>
>
>
> I think I can constrain it by modifying the LINQ expression like this:
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(100)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .ToList();
>
>
>
> Which will at least limit the overall number examined to be 100, while not
> capturing the first 100 that do match.
>
>
>
> I could further modify it to a ‘double-take’ which still constrains the
> overall query but improves the chances of filling the maximum take of 100
> matching items
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(1000)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
> Or is there a better way?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Raymond Wilson [mailto:[email protected]]
> *Sent:* Monday, April 23, 2018 1:11 PM
> *To:* [email protected]
> *Subject:* Using a cache as an affinity co-located processing buffer in
> Ignite.Net
>
>
>
> All,
>
>
>
> I have been thinking about how to use Ignite.Net to support an affinity
> co-located ingest pipeline that uses queue buffering to provide fault
> tolerance and buffering for a flow of ingest packages.
>
>
>
> At a high level, it looks like this:
>
>
>
> Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache,
> affinity co-located with PackageProcessor]
>
> Processing pipeline: [PackageCache] -> [PackageProcessor] ->
> [ProcessedDataCache affinity co-located with PackageProcessor]
>
>
>
> Essentially, I want a cache that look like this:
>
>
>
> Public class CacheItem
>
> {
>
>     Public DateTime date;
>
>
>
>   [AffinityKeyMapped]
>
>      public Guid AffinityKey;
>
>
>
>      public byte [] Package;
>
> }
>
>
>
>    ICache<string, CacheTime> BufferQueue.
>
>
>
> BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (
>
>                     new CacheConfiguration
>
>                     {
>
>                         Name = “BufferQueue”,
>
>
>
>                         KeepBinaryInStore = true,
>
>
>
>                         // Replicate the maps across nodes
>
>                         CacheMode = CacheMode.Partitioned,
>
>                     });
>
>             }
>
>
>
> This queue will target a data region that is configured for persistency.
>
>
>
> Inbound packages will arrive and be injected into the BufferQueue cache
> from some client node context, like this:
>
>
>
> public void HandleANewPackage(string key, Guid affinityKey, byte []
> package)
>
> {
>
> BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
> affinityKey, Package = package});
>
> }
>
>
>
> There will be a collection of server nodes that are responsible for the
> cache.
>
>
>
> This is all straightforward. The tricky bit is then processing the
> elements in the BufferQueue cache.
>
>
>
> The data is already on the server nodes, nicely co-located according to
> its affinity. I want to have parallel processing logic that runs on the
> server nodes that pulls elements from the buffer queue and processes them
> into some other cache(s).
>
>
>
> At this point I know I have a cache that may contain something needing to
> be processed, but I don’t know their keys. I know it’s possible to have
> logic running on each server node that does this (either as a Grid Service
> or a Compute::Broadcast() lambda):
>
>
>
> var cache = ignite.GetCache<string, CacheItem>("BufferQueue");
>
> var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
> ()));
>
>
>
> foreach (var cacheEntry in cursor)
>
>     ProcessItem(CacheEntry);
>
>
>
> …but I am not sure how to restrict the elements in the cache returned to
> the query to be only those entries affinity co-located with the server
> asking for them.
>
>
>
> Is this so obvious that it just works and does not need documentation, or
> is this not possible and I should run the processing context from a client
> node context (as above) and pay the penalty of extracting the packages from
> the cache with cache.Query() and then resubmitting them using an affinity
> aware method like AffinityRun()?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
>
>
>
>

Reply via email to