> Is the initial query also run in the context of the remote node and the remote filter? No, it is just a query (can be SQL or Scan) which allows you to get a "full picture" on the calling node: all existing data and all future data.
So in your scenario it is not very useful. > return false from the filter so the element is not sent to the local listener Yes, exactly On Mon, Apr 23, 2018 at 11:18 AM, Raymond Wilson <[email protected] > wrote: > OK – I see how that works. > > > > In the page https://apacheignite-net.readme.io/docs/continuous-queries , > there is this code: > > > > using (var queryHandle = cache.QueryContinuous(qry, initialQry)) > > { > > // Iterate through existing data stored in cache. > > foreach (var entry in queryHandle.GetInitialQueryCursor()) > > Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value); > > > > // Add a few more keys and watch a few more query notifications. > > for (int i = 5; i < 15; i++) > > cache.Put(i, i.ToString()); > > } > > > > Is the initial query also run in the context of the remote node and the > remote filter? > > > > Construction of the ContinuousQuery also requires provision of > LocalListener to receive the cache update items. Is the approach here to > processing the element in the remote filter context then return false from > the filter so the element is not sent to the local listener? > > > > > > *From:* Pavel Tupitsyn [mailto:[email protected]] > *Sent:* Monday, April 23, 2018 7:50 PM > > *To:* [email protected] > *Subject:* Re: Using a cache as an affinity co-located processing buffer > in Ignite.Net > > > > Remote Listener is deployed on every cache node and is invoked only on a > primary node for that key. > > In other words, for each key there is only one invocation of the remote > filter, and that invocation is local to that key. > > > > So you can place your processing logic into the Remote Filter. > > > > On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson < > [email protected]> wrote: > > Hi Pavel, > > > > Yes, I looked at continuous queries. They appear to be oriented toward a > single context being sent the newly arrived elements in the cache from all > primary nodes hosting the cache involved in the query. > > > > In the use case I outlined below, I would like to have the items processed > in co-located contexts (ie: the data does not move and is processed in situ > on the primary node). How do you do that with a continuous query? > > > > Thanks, > > Raymond. > > > > *From:* Pavel Tupitsyn [mailto:[email protected]] > *Sent:* Monday, April 23, 2018 7:18 PM > *To:* [email protected] > *Subject:* Re: Using a cache as an affinity co-located processing buffer > in Ignite.Net > > > > Hi Raymond, > > > > To process incoming data in a co-located fashion there is a Continuous > Query feature [1]. > > Looks like it fits your use case quite well. > > > > > > [1] https://apacheignite-net.readme.io/docs/continuous-queries > > > > On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson < > [email protected]> wrote: > > I did find ICache.GetLocalEntries() method and have written the following > as a proof of concept (yet to exercise it though): > > > > IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>> > localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary}); > > > > ICacheEntry<BufferQueueKey, BufferQueueItem> first = > localItems.FirstOrDefault(); > > > > if (first != null) > > { > > // Get the list of all items in the buffer matching the > affinity key of the first item > > // in the list, limiting the result set to 100 TAG files. > > List<ICacheEntry<BufferQueueKey, BufferQueueItem>> > candidates = localItems > > .Where(x => x.Value.AffinityKey == > first.Value.AffinityKey) > > .Take(100) > > .ToList(); > > > > if (candidates?.Count > 0) > > { > > // Submit the list of items to the processor > > // ... > > } > > } > > > > This seems like it should do what I want, but I’m a little suspicious that > it may evaluate the entire content of the cache against the Where() > condition before taking the first 100 results. > > > > I think I can constrain it by modifying the LINQ expression like this: > > > > List<ICacheEntry<BufferQueueKey, BufferQueueItem>> > candidates = localItems > > .Take(100) > > .Where(x => x.Value.AffinityKey == > first.Value.AffinityKey) > > .ToList(); > > > > Which will at least limit the overall number examined to be 100, while not > capturing the first 100 that do match. > > > > I could further modify it to a ‘double-take’ which still constrains the > overall query but improves the chances of filling the maximum take of 100 > matching items > > > > List<ICacheEntry<BufferQueueKey, BufferQueueItem>> > candidates = localItems > > .Take(1000) > > .Where(x => x.Value.AffinityKey == > first.Value.AffinityKey) > > .Take(100) > > .ToList(); > > > > Or is there a better way? > > > > Thanks, > > Raymond. > > > > *From:* Raymond Wilson [mailto:[email protected]] > *Sent:* Monday, April 23, 2018 1:11 PM > *To:* [email protected] > *Subject:* Using a cache as an affinity co-located processing buffer in > Ignite.Net > > > > All, > > > > I have been thinking about how to use Ignite.Net to support an affinity > co-located ingest pipeline that uses queue buffering to provide fault > tolerance and buffering for a flow of ingest packages. > > > > At a high level, it looks like this: > > > > Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache, > affinity co-located with PackageProcessor] > > Processing pipeline: [PackageCache] -> [PackageProcessor] -> > [ProcessedDataCache affinity co-located with PackageProcessor] > > > > Essentially, I want a cache that look like this: > > > > Public class CacheItem > > { > > Public DateTime date; > > > > [AffinityKeyMapped] > > public Guid AffinityKey; > > > > public byte [] Package; > > } > > > > ICache<string, CacheTime> BufferQueue. > > > > BufferQueue = ignite.GetOrCreateCache <string, CacheItem > ( > > new CacheConfiguration > > { > > Name = “BufferQueue”, > > > > KeepBinaryInStore = true, > > > > // Replicate the maps across nodes > > CacheMode = CacheMode.Partitioned, > > }); > > } > > > > This queue will target a data region that is configured for persistency. > > > > Inbound packages will arrive and be injected into the BufferQueue cache > from some client node context, like this: > > > > public void HandleANewPackage(string key, Guid affinityKey, byte [] > package) > > { > > BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey = > affinityKey, Package = package}); > > } > > > > There will be a collection of server nodes that are responsible for the > cache. > > > > This is all straightforward. The tricky bit is then processing the > elements in the BufferQueue cache. > > > > The data is already on the server nodes, nicely co-located according to > its affinity. I want to have parallel processing logic that runs on the > server nodes that pulls elements from the buffer queue and processes them > into some other cache(s). > > > > At this point I know I have a cache that may contain something needing to > be processed, but I don’t know their keys. I know it’s possible to have > logic running on each server node that does this (either as a Grid Service > or a Compute::Broadcast() lambda): > > > > var cache = ignite.GetCache<string, CacheItem>("BufferQueue"); > > var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter > ())); > > > > foreach (var cacheEntry in cursor) > > ProcessItem(CacheEntry); > > > > …but I am not sure how to restrict the elements in the cache returned to > the query to be only those entries affinity co-located with the server > asking for them. > > > > Is this so obvious that it just works and does not need documentation, or > is this not possible and I should run the processing context from a client > node context (as above) and pay the penalty of extracting the packages from > the cache with cache.Query() and then resubmitting them using an affinity > aware method like AffinityRun()? > > > > Thanks, > > Raymond. > > > > > > > > >
