Hi Raymond, To process incoming data in a co-located fashion there is a Continuous Query feature [1]. Looks like it fits your use case quite well.
[1] https://apacheignite-net.readme.io/docs/continuous-queries On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <[email protected]> wrote: > I did find ICache.GetLocalEntries() method and have written the following > as a proof of concept (yet to exercise it though): > > > > IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>> > localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary}); > > > > ICacheEntry<BufferQueueKey, BufferQueueItem> first = > localItems.FirstOrDefault(); > > > > if (first != null) > > { > > // Get the list of all items in the buffer matching the > affinity key of the first item > > // in the list, limiting the result set to 100 TAG files. > > List<ICacheEntry<BufferQueueKey, BufferQueueItem>> > candidates = localItems > > .Where(x => x.Value.AffinityKey == > first.Value.AffinityKey) > > .Take(100) > > .ToList(); > > > > if (candidates?.Count > 0) > > { > > // Submit the list of items to the processor > > // ... > > } > > } > > > > This seems like it should do what I want, but I’m a little suspicious that > it may evaluate the entire content of the cache against the Where() > condition before taking the first 100 results. > > > > I think I can constrain it by modifying the LINQ expression like this: > > > > List<ICacheEntry<BufferQueueKey, BufferQueueItem>> > candidates = localItems > > .Take(100) > > .Where(x => x.Value.AffinityKey == > first.Value.AffinityKey) > > .ToList(); > > > > Which will at least limit the overall number examined to be 100, while not > capturing the first 100 that do match. > > > > I could further modify it to a ‘double-take’ which still constrains the > overall query but improves the chances of filling the maximum take of 100 > matching items > > > > List<ICacheEntry<BufferQueueKey, BufferQueueItem>> > candidates = localItems > > .Take(1000) > > .Where(x => x.Value.AffinityKey == > first.Value.AffinityKey) > > .Take(100) > > .ToList(); > > > > Or is there a better way? > > > > Thanks, > > Raymond. > > > > *From:* Raymond Wilson [mailto:[email protected]] > *Sent:* Monday, April 23, 2018 1:11 PM > *To:* [email protected] > *Subject:* Using a cache as an affinity co-located processing buffer in > Ignite.Net > > > > All, > > > > I have been thinking about how to use Ignite.Net to support an affinity > co-located ingest pipeline that uses queue buffering to provide fault > tolerance and buffering for a flow of ingest packages. > > > > At a high level, it looks like this: > > > > Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache, > affinity co-located with PackageProcessor] > > Processing pipeline: [PackageCache] -> [PackageProcessor] -> > [ProcessedDataCache affinity co-located with PackageProcessor] > > > > Essentially, I want a cache that look like this: > > > > Public class CacheItem > > { > > Public DateTime date; > > > > [AffinityKeyMapped] > > public Guid AffinityKey; > > > > public byte [] Package; > > } > > > > ICache<string, CacheTime> BufferQueue. > > > > BufferQueue = ignite.GetOrCreateCache <string, CacheItem > ( > > new CacheConfiguration > > { > > Name = “BufferQueue”, > > > > KeepBinaryInStore = true, > > > > // Replicate the maps across nodes > > CacheMode = CacheMode.Partitioned, > > }); > > } > > > > This queue will target a data region that is configured for persistency. > > > > Inbound packages will arrive and be injected into the BufferQueue cache > from some client node context, like this: > > > > public void HandleANewPackage(string key, Guid affinityKey, byte [] > package) > > { > > BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey = > affinityKey, Package = package}); > > } > > > > There will be a collection of server nodes that are responsible for the > cache. > > > > This is all straightforward. The tricky bit is then processing the > elements in the BufferQueue cache. > > > > The data is already on the server nodes, nicely co-located according to > its affinity. I want to have parallel processing logic that runs on the > server nodes that pulls elements from the buffer queue and processes them > into some other cache(s). > > > > At this point I know I have a cache that may contain something needing to > be processed, but I don’t know their keys. I know it’s possible to have > logic running on each server node that does this (either as a Grid Service > or a Compute::Broadcast() lambda): > > > > var cache = ignite.GetCache<string, CacheItem>("BufferQueue"); > > var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter > ())); > > > > foreach (var cacheEntry in cursor) > > ProcessItem(CacheEntry); > > > > …but I am not sure how to restrict the elements in the cache returned to > the query to be only those entries affinity co-located with the server > asking for them. > > > > Is this so obvious that it just works and does not need documentation, or > is this not possible and I should run the processing context from a client > node context (as above) and pay the penalty of extracting the packages from > the cache with cache.Query() and then resubmitting them using an affinity > aware method like AffinityRun()? > > > > Thanks, > > Raymond. > > > > >
