ContinuousQuery is the best practice for most kinds of streaming use cases. I think it fits your use case as well.
On Tue, Apr 24, 2018 at 10:08 AM, Raymond Wilson <[email protected] > wrote: > Thanks, that makes sense. > > From a best practices perspective, is better to have a grid deployed > service on each node executing local continuous queries against the cache > and orchestrating the processing from within the service, versus having > some singular context in the grid that uses the continuous query by placing > processing orchestration logic in the filter sent to the remote nodes? > > Sent from my iPhone > > On 24/04/2018, at 6:53 PM, Pavel Tupitsyn <[email protected]> wrote: > > Sorry, looks like I have misunderstood you. > > If you need initial scan, of course you can have it by using ScanQuery as > initialQuery. > Place all the processing logic into the ScanQuery filter, and return false > from there. > This way you can process all existing entries in a co-located fashion > without sending them to the initiator node. > > Thanks, > Pavel > > On Mon, Apr 23, 2018 at 11:50 PM, Raymond Wilson < > [email protected]> wrote: > >> Not being able to do an initial scan of elements on the remote nodes is a >> bit of a problem (possibly a bug?) >> >> >> >> Something that’s occurred to me is to wrap this behaviour into an Ignite >> service deployed onto all of the server nodes, and use a local mode >> continuous query from within each service to perform an initial scan of >> elements and then steady state handling as new elements arrive. >> >> >> >> The reason the initial scan is important is I need to handle cases where >> there may be a non-trivial queue of items waiting for processing and there >> is either a shutdown/restart of the grid, or there is a topology change >> event that triggers rebalancing >> >> >> >> *From:* Pavel Tupitsyn [mailto:[email protected]] >> *Sent:* Tuesday, April 24, 2018 5:54 AM >> >> *To:* [email protected] >> *Subject:* Re: Using a cache as an affinity co-located processing buffer >> in Ignite.Net >> >> >> >> > Is the initial query also run in the context of the remote node and >> the remote filter? >> >> No, it is just a query (can be SQL or Scan) which allows you to get a >> "full picture" on the calling node: >> >> all existing data and all future data. >> >> >> >> So in your scenario it is not very useful. >> >> >> >> > return false from the filter so the element is not sent to the local >> listener >> >> Yes, exactly >> >> >> >> On Mon, Apr 23, 2018 at 11:18 AM, Raymond Wilson < >> [email protected]> wrote: >> >> OK – I see how that works. >> >> >> >> In the page https://apacheignite-net.readme.io/docs/continuous-queries , >> there is this code: >> >> >> >> using (var queryHandle = cache.QueryContinuous(qry, initialQry)) >> >> { >> >> // Iterate through existing data stored in cache. >> >> foreach (var entry in queryHandle.GetInitialQueryCursor()) >> >> Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value); >> >> >> >> // Add a few more keys and watch a few more query notifications. >> >> for (int i = 5; i < 15; i++) >> >> cache.Put(i, i.ToString()); >> >> } >> >> >> >> Is the initial query also run in the context of the remote node and the >> remote filter? >> >> >> >> Construction of the ContinuousQuery also requires provision of >> LocalListener to receive the cache update items. Is the approach here to >> processing the element in the remote filter context then return false from >> the filter so the element is not sent to the local listener? >> >> >> >> >> >> *From:* Pavel Tupitsyn [mailto:[email protected]] >> *Sent:* Monday, April 23, 2018 7:50 PM >> >> >> *To:* [email protected] >> *Subject:* Re: Using a cache as an affinity co-located processing buffer >> in Ignite.Net >> >> >> >> Remote Listener is deployed on every cache node and is invoked only on a >> primary node for that key. >> >> In other words, for each key there is only one invocation of the remote >> filter, and that invocation is local to that key. >> >> >> >> So you can place your processing logic into the Remote Filter. >> >> >> >> On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson < >> [email protected]> wrote: >> >> Hi Pavel, >> >> >> >> Yes, I looked at continuous queries. They appear to be oriented toward a >> single context being sent the newly arrived elements in the cache from all >> primary nodes hosting the cache involved in the query. >> >> >> >> In the use case I outlined below, I would like to have the items >> processed in co-located contexts (ie: the data does not move and is >> processed in situ on the primary node). How do you do that with a >> continuous query? >> >> >> >> Thanks, >> >> Raymond. >> >> >> >> *From:* Pavel Tupitsyn [mailto:[email protected]] >> *Sent:* Monday, April 23, 2018 7:18 PM >> *To:* [email protected] >> *Subject:* Re: Using a cache as an affinity co-located processing buffer >> in Ignite.Net >> >> >> >> Hi Raymond, >> >> >> >> To process incoming data in a co-located fashion there is a Continuous >> Query feature [1]. >> >> Looks like it fits your use case quite well. >> >> >> >> >> >> [1] https://apacheignite-net.readme.io/docs/continuous-queries >> >> >> >> On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson < >> [email protected]> wrote: >> >> I did find ICache.GetLocalEntries() method and have written the following >> as a proof of concept (yet to exercise it though): >> >> >> >> IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>> >> localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary}); >> >> >> >> ICacheEntry<BufferQueueKey, BufferQueueItem> first = >> localItems.FirstOrDefault(); >> >> >> >> if (first != null) >> >> { >> >> // Get the list of all items in the buffer matching the >> affinity key of the first item >> >> // in the list, limiting the result set to 100 TAG files. >> >> List<ICacheEntry<BufferQueueKey, BufferQueueItem>> >> candidates = localItems >> >> .Where(x => x.Value.AffinityKey == >> first.Value.AffinityKey) >> >> .Take(100) >> >> .ToList(); >> >> >> >> if (candidates?.Count > 0) >> >> { >> >> // Submit the list of items to the processor >> >> // ... >> >> } >> >> } >> >> >> >> This seems like it should do what I want, but I’m a little suspicious >> that it may evaluate the entire content of the cache against the Where() >> condition before taking the first 100 results. >> >> >> >> I think I can constrain it by modifying the LINQ expression like this: >> >> >> >> List<ICacheEntry<BufferQueueKey, BufferQueueItem>> >> candidates = localItems >> >> .Take(100) >> >> .Where(x => x.Value.AffinityKey == >> first.Value.AffinityKey) >> >> .ToList(); >> >> >> >> Which will at least limit the overall number examined to be 100, while >> not capturing the first 100 that do match. >> >> >> >> I could further modify it to a ‘double-take’ which still constrains the >> overall query but improves the chances of filling the maximum take of 100 >> matching items >> >> >> >> List<ICacheEntry<BufferQueueKey, BufferQueueItem>> >> candidates = localItems >> >> .Take(1000) >> >> .Where(x => x.Value.AffinityKey == >> first.Value.AffinityKey) >> >> .Take(100) >> >> .ToList(); >> >> >> >> Or is there a better way? >> >> >> >> Thanks, >> >> Raymond. >> >> >> >> *From:* Raymond Wilson [mailto:[email protected]] >> *Sent:* Monday, April 23, 2018 1:11 PM >> *To:* [email protected] >> *Subject:* Using a cache as an affinity co-located processing buffer in >> Ignite.Net >> >> >> >> All, >> >> >> >> I have been thinking about how to use Ignite.Net to support an affinity >> co-located ingest pipeline that uses queue buffering to provide fault >> tolerance and buffering for a flow of ingest packages. >> >> >> >> At a high level, it looks like this: >> >> >> >> Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache, >> affinity co-located with PackageProcessor] >> >> Processing pipeline: [PackageCache] -> [PackageProcessor] -> >> [ProcessedDataCache affinity co-located with PackageProcessor] >> >> >> >> Essentially, I want a cache that look like this: >> >> >> >> Public class CacheItem >> >> { >> >> Public DateTime date; >> >> >> >> [AffinityKeyMapped] >> >> public Guid AffinityKey; >> >> >> >> public byte [] Package; >> >> } >> >> >> >> ICache<string, CacheTime> BufferQueue. >> >> >> >> BufferQueue = ignite.GetOrCreateCache <string, CacheItem > ( >> >> new CacheConfiguration >> >> { >> >> Name = “BufferQueue”, >> >> >> >> KeepBinaryInStore = true, >> >> >> >> // Replicate the maps across nodes >> >> CacheMode = CacheMode.Partitioned, >> >> }); >> >> } >> >> >> >> This queue will target a data region that is configured for persistency. >> >> >> >> Inbound packages will arrive and be injected into the BufferQueue cache >> from some client node context, like this: >> >> >> >> public void HandleANewPackage(string key, Guid affinityKey, byte [] >> package) >> >> { >> >> BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey >> = affinityKey, Package = package}); >> >> } >> >> >> >> There will be a collection of server nodes that are responsible for the >> cache. >> >> >> >> This is all straightforward. The tricky bit is then processing the >> elements in the BufferQueue cache. >> >> >> >> The data is already on the server nodes, nicely co-located according to >> its affinity. I want to have parallel processing logic that runs on the >> server nodes that pulls elements from the buffer queue and processes them >> into some other cache(s). >> >> >> >> At this point I know I have a cache that may contain something needing to >> be processed, but I don’t know their keys. I know it’s possible to have >> logic running on each server node that does this (either as a Grid Service >> or a Compute::Broadcast() lambda): >> >> >> >> var cache = ignite.GetCache<string, CacheItem>("BufferQueue"); >> >> var cursor = cache.Query(new ScanQuery<string, CacheItem >(new >> QueryFilter())); >> >> >> >> foreach (var cacheEntry in cursor) >> >> ProcessItem(CacheEntry); >> >> >> >> …but I am not sure how to restrict the elements in the cache returned to >> the query to be only those entries affinity co-located with the server >> asking for them. >> >> >> >> Is this so obvious that it just works and does not need documentation, or >> is this not possible and I should run the processing context from a client >> node context (as above) and pay the penalty of extracting the packages from >> the cache with cache.Query() and then resubmitting them using an affinity >> aware method like AffinityRun()? >> >> >> >> Thanks, >> >> Raymond. >> >> >> >> >> >> >> >> >> >> >> > >
