Hi Raymond,

To process incoming data in a co-located fashion there is a Continuous
Query feature [1].
Looks like it fits your use case quite well.


[1] https://apacheignite-net.readme.io/docs/continuous-queries

On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <[email protected]>
wrote:

> I did find ICache.GetLocalEntries() method and have written the following
> as a proof of concept (yet to exercise it though):
>
>
>
>             IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});
>
>
>
>             ICacheEntry<BufferQueueKey, BufferQueueItem> first =
> localItems.FirstOrDefault();
>
>
>
>             if (first != null)
>
>             {
>
>                 // Get the list of all items in the buffer matching the
> affinity key of the first item
>
>                 // in the list, limiting the result set to 100 TAG files.
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
>                 if (candidates?.Count > 0)
>
>                 {
>
>                     // Submit the list of items to the processor
>
>                     // ...
>
>                 }
>
>             }
>
>
>
> This seems like it should do what I want, but I’m a little suspicious that
> it may evaluate the entire content of the cache against the Where()
> condition before taking the first 100 results.
>
>
>
> I think I can constrain it by modifying the LINQ expression like this:
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(100)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .ToList();
>
>
>
> Which will at least limit the overall number examined to be 100, while not
> capturing the first 100 that do match.
>
>
>
> I could further modify it to a ‘double-take’ which still constrains the
> overall query but improves the chances of filling the maximum take of 100
> matching items
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(1000)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
> Or is there a better way?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Raymond Wilson [mailto:[email protected]]
> *Sent:* Monday, April 23, 2018 1:11 PM
> *To:* [email protected]
> *Subject:* Using a cache as an affinity co-located processing buffer in
> Ignite.Net
>
>
>
> All,
>
>
>
> I have been thinking about how to use Ignite.Net to support an affinity
> co-located ingest pipeline that uses queue buffering to provide fault
> tolerance and buffering for a flow of ingest packages.
>
>
>
> At a high level, it looks like this:
>
>
>
> Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache,
> affinity co-located with PackageProcessor]
>
> Processing pipeline: [PackageCache] -> [PackageProcessor] ->
> [ProcessedDataCache affinity co-located with PackageProcessor]
>
>
>
> Essentially, I want a cache that look like this:
>
>
>
> Public class CacheItem
>
> {
>
>     Public DateTime date;
>
>
>
>   [AffinityKeyMapped]
>
>      public Guid AffinityKey;
>
>
>
>      public byte [] Package;
>
> }
>
>
>
>    ICache<string, CacheTime> BufferQueue.
>
>
>
> BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (
>
>                     new CacheConfiguration
>
>                     {
>
>                         Name = “BufferQueue”,
>
>
>
>                         KeepBinaryInStore = true,
>
>
>
>                         // Replicate the maps across nodes
>
>                         CacheMode = CacheMode.Partitioned,
>
>                     });
>
>             }
>
>
>
> This queue will target a data region that is configured for persistency.
>
>
>
> Inbound packages will arrive and be injected into the BufferQueue cache
> from some client node context, like this:
>
>
>
> public void HandleANewPackage(string key, Guid affinityKey, byte []
> package)
>
> {
>
> BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
> affinityKey, Package = package});
>
> }
>
>
>
> There will be a collection of server nodes that are responsible for the
> cache.
>
>
>
> This is all straightforward. The tricky bit is then processing the
> elements in the BufferQueue cache.
>
>
>
> The data is already on the server nodes, nicely co-located according to
> its affinity. I want to have parallel processing logic that runs on the
> server nodes that pulls elements from the buffer queue and processes them
> into some other cache(s).
>
>
>
> At this point I know I have a cache that may contain something needing to
> be processed, but I don’t know their keys. I know it’s possible to have
> logic running on each server node that does this (either as a Grid Service
> or a Compute::Broadcast() lambda):
>
>
>
> var cache = ignite.GetCache<string, CacheItem>("BufferQueue");
>
> var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
> ()));
>
>
>
> foreach (var cacheEntry in cursor)
>
>     ProcessItem(CacheEntry);
>
>
>
> …but I am not sure how to restrict the elements in the cache returned to
> the query to be only those entries affinity co-located with the server
> asking for them.
>
>
>
> Is this so obvious that it just works and does not need documentation, or
> is this not possible and I should run the processing context from a client
> node context (as above) and pay the penalty of extracting the packages from
> the cache with cache.Query() and then resubmitting them using an affinity
> aware method like AffinityRun()?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
>
>

Reply via email to