ContinuousQuery is the best practice for most kinds of streaming use cases.
I think it fits your use case as well.

On Tue, Apr 24, 2018 at 10:08 AM, Raymond Wilson <[email protected]
> wrote:

> Thanks, that makes sense.
>
> From a best practices perspective, is better to have a grid deployed
> service on each node executing local continuous queries against the cache
> and orchestrating the processing from within the service, versus having
> some singular context in the grid that uses the continuous query by placing
> processing orchestration logic in the filter sent to the remote nodes?
>
> Sent from my iPhone
>
> On 24/04/2018, at 6:53 PM, Pavel Tupitsyn <[email protected]> wrote:
>
> Sorry, looks like I have misunderstood you.
>
> If you need initial scan, of course you can have it by using ScanQuery as
> initialQuery.
> Place all the processing logic into the ScanQuery filter, and return false
> from there.
> This way you can process all existing entries in a co-located fashion
> without sending them to the initiator node.
>
> Thanks,
> Pavel
>
> On Mon, Apr 23, 2018 at 11:50 PM, Raymond Wilson <
> [email protected]> wrote:
>
>> Not being able to do an initial scan of elements on the remote nodes is a
>> bit of a problem (possibly a bug?)
>>
>>
>>
>> Something that’s occurred to me is to wrap this behaviour into an Ignite
>> service deployed onto all of the server nodes, and use a local mode
>> continuous query from within each service to perform an initial scan of
>> elements and then steady state handling as new elements arrive.
>>
>>
>>
>> The reason the initial scan is important is I need to handle cases where
>> there may be a non-trivial queue of items waiting for processing and there
>> is either a shutdown/restart of the grid, or there is a topology change
>> event that triggers rebalancing
>>
>>
>>
>> *From:* Pavel Tupitsyn [mailto:[email protected]]
>> *Sent:* Tuesday, April 24, 2018 5:54 AM
>>
>> *To:* [email protected]
>> *Subject:* Re: Using a cache as an affinity co-located processing buffer
>> in Ignite.Net
>>
>>
>>
>> >  Is the initial query also run in the context of the remote node and
>> the remote filter?
>>
>> No, it is just a query (can be SQL or Scan) which allows you to get a
>> "full picture" on the calling node:
>>
>> all existing data and all future data.
>>
>>
>>
>> So in your scenario it is not very useful.
>>
>>
>>
>> >   return false from the filter so the element is not sent to the local
>> listener
>>
>> Yes, exactly
>>
>>
>>
>> On Mon, Apr 23, 2018 at 11:18 AM, Raymond Wilson <
>> [email protected]> wrote:
>>
>> OK – I see how that works.
>>
>>
>>
>> In the page https://apacheignite-net.readme.io/docs/continuous-queries ,
>> there is this code:
>>
>>
>>
>> using (var queryHandle = cache.QueryContinuous(qry, initialQry))
>>
>> {
>>
>>     // Iterate through existing data stored in cache.
>>
>>     foreach (var entry in queryHandle.GetInitialQueryCursor())
>>
>>         Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value);
>>
>>
>>
>>     // Add a few more keys and watch a few more query notifications.
>>
>>     for (int i = 5; i < 15; i++)
>>
>>         cache.Put(i, i.ToString());
>>
>> }
>>
>>
>>
>> Is the initial query also run in the context of the remote node and the
>> remote filter?
>>
>>
>>
>> Construction of the ContinuousQuery also requires provision of
>> LocalListener to receive the cache update items. Is the approach here to
>> processing the element in the remote filter context then return false from
>> the filter so the element is not sent to the local listener?
>>
>>
>>
>>
>>
>> *From:* Pavel Tupitsyn [mailto:[email protected]]
>> *Sent:* Monday, April 23, 2018 7:50 PM
>>
>>
>> *To:* [email protected]
>> *Subject:* Re: Using a cache as an affinity co-located processing buffer
>> in Ignite.Net
>>
>>
>>
>> Remote Listener is deployed on every cache node and is invoked only on a
>> primary node for that key.
>>
>> In other words, for each key there is only one invocation of the remote
>> filter, and that invocation is local to that key.
>>
>>
>>
>> So you can place your processing logic into the Remote Filter.
>>
>>
>>
>> On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson <
>> [email protected]> wrote:
>>
>> Hi Pavel,
>>
>>
>>
>> Yes, I looked at continuous queries. They appear to be oriented toward a
>> single context being sent the newly arrived elements in the cache from all
>> primary nodes hosting the cache involved in the query.
>>
>>
>>
>> In the use case I outlined below, I would like to have the items
>> processed in co-located contexts (ie: the data does not move and is
>> processed in situ on the primary node). How do you do that with a
>> continuous query?
>>
>>
>>
>> Thanks,
>>
>> Raymond.
>>
>>
>>
>> *From:* Pavel Tupitsyn [mailto:[email protected]]
>> *Sent:* Monday, April 23, 2018 7:18 PM
>> *To:* [email protected]
>> *Subject:* Re: Using a cache as an affinity co-located processing buffer
>> in Ignite.Net
>>
>>
>>
>> Hi Raymond,
>>
>>
>>
>> To process incoming data in a co-located fashion there is a Continuous
>> Query feature [1].
>>
>> Looks like it fits your use case quite well.
>>
>>
>>
>>
>>
>> [1] https://apacheignite-net.readme.io/docs/continuous-queries
>>
>>
>>
>> On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <
>> [email protected]> wrote:
>>
>> I did find ICache.GetLocalEntries() method and have written the following
>> as a proof of concept (yet to exercise it though):
>>
>>
>>
>>             IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
>> localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});
>>
>>
>>
>>             ICacheEntry<BufferQueueKey, BufferQueueItem> first =
>> localItems.FirstOrDefault();
>>
>>
>>
>>             if (first != null)
>>
>>             {
>>
>>                 // Get the list of all items in the buffer matching the
>> affinity key of the first item
>>
>>                 // in the list, limiting the result set to 100 TAG files.
>>
>>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
>> candidates = localItems
>>
>>                     .Where(x => x.Value.AffinityKey ==
>> first.Value.AffinityKey)
>>
>>                     .Take(100)
>>
>>                     .ToList();
>>
>>
>>
>>                 if (candidates?.Count > 0)
>>
>>                 {
>>
>>                     // Submit the list of items to the processor
>>
>>                     // ...
>>
>>                 }
>>
>>             }
>>
>>
>>
>> This seems like it should do what I want, but I’m a little suspicious
>> that it may evaluate the entire content of the cache against the Where()
>> condition before taking the first 100 results.
>>
>>
>>
>> I think I can constrain it by modifying the LINQ expression like this:
>>
>>
>>
>>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
>> candidates = localItems
>>
>>                     .Take(100)
>>
>>                     .Where(x => x.Value.AffinityKey ==
>> first.Value.AffinityKey)
>>
>>                     .ToList();
>>
>>
>>
>> Which will at least limit the overall number examined to be 100, while
>> not capturing the first 100 that do match.
>>
>>
>>
>> I could further modify it to a ‘double-take’ which still constrains the
>> overall query but improves the chances of filling the maximum take of 100
>> matching items
>>
>>
>>
>>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
>> candidates = localItems
>>
>>                     .Take(1000)
>>
>>                     .Where(x => x.Value.AffinityKey ==
>> first.Value.AffinityKey)
>>
>>                     .Take(100)
>>
>>                     .ToList();
>>
>>
>>
>> Or is there a better way?
>>
>>
>>
>> Thanks,
>>
>> Raymond.
>>
>>
>>
>> *From:* Raymond Wilson [mailto:[email protected]]
>> *Sent:* Monday, April 23, 2018 1:11 PM
>> *To:* [email protected]
>> *Subject:* Using a cache as an affinity co-located processing buffer in
>> Ignite.Net
>>
>>
>>
>> All,
>>
>>
>>
>> I have been thinking about how to use Ignite.Net to support an affinity
>> co-located ingest pipeline that uses queue buffering to provide fault
>> tolerance and buffering for a flow of ingest packages.
>>
>>
>>
>> At a high level, it looks like this:
>>
>>
>>
>> Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache,
>> affinity co-located with PackageProcessor]
>>
>> Processing pipeline: [PackageCache] -> [PackageProcessor] ->
>> [ProcessedDataCache affinity co-located with PackageProcessor]
>>
>>
>>
>> Essentially, I want a cache that look like this:
>>
>>
>>
>> Public class CacheItem
>>
>> {
>>
>>     Public DateTime date;
>>
>>
>>
>>   [AffinityKeyMapped]
>>
>>      public Guid AffinityKey;
>>
>>
>>
>>      public byte [] Package;
>>
>> }
>>
>>
>>
>>    ICache<string, CacheTime> BufferQueue.
>>
>>
>>
>> BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (
>>
>>                     new CacheConfiguration
>>
>>                     {
>>
>>                         Name = “BufferQueue”,
>>
>>
>>
>>                         KeepBinaryInStore = true,
>>
>>
>>
>>                         // Replicate the maps across nodes
>>
>>                         CacheMode = CacheMode.Partitioned,
>>
>>                     });
>>
>>             }
>>
>>
>>
>> This queue will target a data region that is configured for persistency.
>>
>>
>>
>> Inbound packages will arrive and be injected into the BufferQueue cache
>> from some client node context, like this:
>>
>>
>>
>> public void HandleANewPackage(string key, Guid affinityKey, byte []
>> package)
>>
>> {
>>
>> BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey
>> = affinityKey, Package = package});
>>
>> }
>>
>>
>>
>> There will be a collection of server nodes that are responsible for the
>> cache.
>>
>>
>>
>> This is all straightforward. The tricky bit is then processing the
>> elements in the BufferQueue cache.
>>
>>
>>
>> The data is already on the server nodes, nicely co-located according to
>> its affinity. I want to have parallel processing logic that runs on the
>> server nodes that pulls elements from the buffer queue and processes them
>> into some other cache(s).
>>
>>
>>
>> At this point I know I have a cache that may contain something needing to
>> be processed, but I don’t know their keys. I know it’s possible to have
>> logic running on each server node that does this (either as a Grid Service
>> or a Compute::Broadcast() lambda):
>>
>>
>>
>> var cache = ignite.GetCache<string, CacheItem>("BufferQueue");
>>
>> var cursor = cache.Query(new ScanQuery<string, CacheItem >(new
>> QueryFilter()));
>>
>>
>>
>> foreach (var cacheEntry in cursor)
>>
>>     ProcessItem(CacheEntry);
>>
>>
>>
>> …but I am not sure how to restrict the elements in the cache returned to
>> the query to be only those entries affinity co-located with the server
>> asking for them.
>>
>>
>>
>> Is this so obvious that it just works and does not need documentation, or
>> is this not possible and I should run the processing context from a client
>> node context (as above) and pay the penalty of extracting the packages from
>> the cache with cache.Query() and then resubmitting them using an affinity
>> aware method like AffinityRun()?
>>
>>
>>
>> Thanks,
>>
>> Raymond.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>
>

Reply via email to