Thanks for the detailed explanation.

I don't know how efficient it would be if you have to filter each record
one by one and then update each record, three times, to keep track of the
status, if you're dealing with millions of records each hour, even if the
cache is partitioned. I guess I will need to benchmark this.  thanks again.

On Wed, Oct 7, 2020 at 12:00 PM Denis Magda <dma...@apache.org> wrote:

> I recalled a complete solution. That's what you would need to do if decide
> to process records in real-time with continuous queries in the
> *fault-tolerant fashion* (using pseudo-code rather than actual Ignite APIs).
>
> First. You need to add a flag field to the record's class that keeps the
> current processing status. Like that:
>
> MyRecord {
> int id;
> Date created;
> byte status; //0 - not processed, 1 - being processed withing a
> continuous query filter, 2 - processed by the filter, all the logic
> successfully completed
> }
>
> Second. The continuous query filter (that will be executed on nodes that
> store a copy of a record) needs to have the following structure.
>
> @IgniteAsyncCallback
> filterMethod(MyRecords updatedRecord) {
>
> if (isThisNodePrimaryNodeForTheRecord(updatedRecord)) { // execute on a
> primary node only
>     updatedRecord.status = 1 // setting the flag to signify the processing
> is started.
>
>     //execute your business logic
>
>     updatedRecord.status = 2 // finished processing
> }
>   return false; //you don't want a notification to be sent to the client
> application or another node that deployed the continuous query
> }
>
> Third. If any node leaves the cluster or the whole cluster is restarted,
> then you need to execute your custom for all the records with status=0 or
> status=1. To do that you can broadcast a compute task:
>
> // Application side
>
> int[] unprocessedRecords = "select id from MyRecord where status < 2;"
>
> IgniteCompute.affinityRun(idsOfUnprocessedRecords, taskWithMyCustomLogic);
> //the task will be executed only on the nodes that store the records
>
> // Server side
>
> taskWithMyCustomLogic() {
>     updatedRecord.status = 1 // setting the flag to signify the processing
> is started.
>
>     //execute your business logic
>
>     updatedRecord.status = 2 // finished processing
> }
>
>
> That's it. So, the third step already requires you to have a compute task
> that would run the calculation in case of failures. Thus, if the real-time
> aspect of the processing is not crucial right now, then you can start with
> the batch-based approach by running a compute task once at a time and then
> introduce the continuous queries-based improvement whenever is needed. You
> decide. Hope it helps.
>
>
> -
> Denis
>
>
> On Wed, Oct 7, 2020 at 9:19 AM Denis Magda <dma...@apache.org> wrote:
>
>> So, a lesson for the future, would the continuous query approach still be
>>> preferable if the calculation involves the cache with continuous query and
>>> say a look up table? For example, if I want to see the country in the cache
>>> employee exists in the list of the countries that I am interested in.
>>
>>
>> You can access other caches from within the filter but the logic has to
>> be executed asynchronously to avoid deadlocks:
>> https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/lang/IgniteAsyncCallback.html
>>
>> Also, what do I need to do if I want the filter for the continuous query
>>> to execute on the cache on the local node only? Say, I have the continuous
>>> query deployed as singleton service on each node, to capture certain
>>> changes to a cache on the local node.
>>
>>
>> <https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/lang/IgniteAsyncCallback.html>
>>
>> The filter will be deployed and executed on every server node. The filter
>> is executed only on a server node that owns a record that is being modified
>> and passed into a filter. Hmm, it's also said that the filter can be
>> executed on a backup node. Check if it's true, and then you need to add a
>> special check into the filter that would allow executing the logic only if
>> it's the primary node:
>>
>> https://ignite.apache.org/docs/latest/key-value-api/continuous-queries#remote-filter
>>
>>
>> -
>> Denis
>>
>>
>> On Wed, Oct 7, 2020 at 4:39 AM narges saleh <snarges...@gmail.com> wrote:
>>
>>> Also, what do I need to do if I want the filter for the continuous query
>>> to execute on the cache on the local node only? Say, I have the continuous
>>> query deployed as singleton service on each node, to capture certain
>>> changes to a cache on the local node.
>>>
>>> On Wed, Oct 7, 2020 at 5:54 AM narges saleh <snarges...@gmail.com>
>>> wrote:
>>>
>>>> Thank you, Denis.
>>>> So, a lesson for the future, would the continuous query approach still
>>>> be preferable if the calculation involves the cache with continuous query
>>>> and say a look up table? For example, if I want to see the country in the
>>>> cache employee exists in the list of the countries that I am interested in.
>>>>
>>>> On Tue, Oct 6, 2020 at 4:11 PM Denis Magda <dma...@apache.org> wrote:
>>>>
>>>>> Thanks
>>>>>
>>>>> Then, I would consider the continuous queries based solution as long
>>>>> as the records can be updated in real-time:
>>>>>
>>>>>    - You can process the records on the fly and don't need to come up
>>>>>    with any batch task.
>>>>>    - The continuous query filter will be executed once on a node that
>>>>>    stores the record's primary copy. If the primary node fails in the 
>>>>> middle
>>>>>    of the filter's calculation execution, then the filter will be 
>>>>> executed on
>>>>>    a backup node. So, you will not lose any updates but might need to
>>>>>    introduce some logic/flag that confirms the calculation is not executed
>>>>>    twice for a single record (this can happen if the primary node failed 
>>>>> in
>>>>>    the middle of the calculation execution and then the backup node 
>>>>> picked up
>>>>>    and started executing the calculation from scratch).
>>>>>    - Updates of other tables or records from within the continuous
>>>>>    query filter must go through an async thread pool. You need to use
>>>>>    IgniteAsyncCallback annotation for that:
>>>>>    
>>>>> https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/lang/IgniteAsyncCallback.html
>>>>>
>>>>> Alternatively, you can always run the calculation in the batch-fashion:
>>>>>
>>>>>    - Run a compute task once in a while
>>>>>    - Read all the latest records that satisfy the requests with SQL
>>>>>    or any other APIs
>>>>>    - Complete the calculation, mark already processed records just in
>>>>>    case if everything is failed in the middle and you need to run the
>>>>>    calculation from scratch
>>>>>
>>>>>
>>>>> -
>>>>> Denis
>>>>>
>>>>>
>>>>> On Mon, Oct 5, 2020 at 8:33 PM narges saleh <snarges...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Denis
>>>>>>  The calculation itself doesn't involve an update or read of another
>>>>>> record, but based on the outcome of the calculation, the process might 
>>>>>> make
>>>>>> changes in some other tables.
>>>>>>
>>>>>> thanks.
>>>>>>
>>>>>> On Mon, Oct 5, 2020 at 7:04 PM Denis Magda <dma...@apache.org> wrote:
>>>>>>
>>>>>>> Good. Another clarification:
>>>>>>>
>>>>>>>    - Does that calculation change the state of the record (updates
>>>>>>>    any fields)?
>>>>>>>    - Does the calculation read or update any other records?
>>>>>>>
>>>>>>> -
>>>>>>> Denis
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Oct 3, 2020 at 1:34 PM narges saleh <snarges...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> The latter; the server needs to perform some calculations on the
>>>>>>>> data without sending any notification to the app.
>>>>>>>>
>>>>>>>> On Fri, Oct 2, 2020 at 4:25 PM Denis Magda <dma...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> And after you detect a record that satisfies the condition, do you
>>>>>>>>> need to send any notification to the application? Or is it more like a
>>>>>>>>> server detects and does some calculation logically without updating 
>>>>>>>>> the app.
>>>>>>>>>
>>>>>>>>> -
>>>>>>>>> Denis
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Oct 2, 2020 at 11:22 AM narges saleh <snarges...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> The detection should happen at most a couple of minutes after a
>>>>>>>>>> record is inserted in the cache but all the detections are local to 
>>>>>>>>>> the
>>>>>>>>>> node. But some records with the current timestamp might show up in 
>>>>>>>>>> the
>>>>>>>>>> system with big delays.
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 2, 2020 at 12:23 PM Denis Magda <dma...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> What are your requirements? Do you need to process the records
>>>>>>>>>>> as soon as they are put into the cluster?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Friday, October 2, 2020, narges saleh <snarges...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thank you Dennis for the reply.
>>>>>>>>>>>> From the perspective of performance/resource overhead and
>>>>>>>>>>>> reliability, which approach is preferable? Does a continuous query 
>>>>>>>>>>>> based
>>>>>>>>>>>> approach impose a lot more overhead?
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 2, 2020 at 9:52 AM Denis Magda <dma...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Narges,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Use continuous queries if you need to be notified in
>>>>>>>>>>>>> real-time, i.e. 1) a record is inserted, 2) the continuous filter 
>>>>>>>>>>>>> confirms
>>>>>>>>>>>>> the record's time satisfies your condition, 3) the continuous 
>>>>>>>>>>>>> queries
>>>>>>>>>>>>> notifies your application that does require processing.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The jobs are better for a batching use case when it's ok to
>>>>>>>>>>>>> process records together with some delay.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> -
>>>>>>>>>>>>> Denis
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Oct 2, 2020 at 3:50 AM narges saleh <
>>>>>>>>>>>>> snarges...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>>  If I want to watch for a rolling timestamp pattern in all
>>>>>>>>>>>>>> the records that get inserted to all my caches, is it more 
>>>>>>>>>>>>>> efficient to use
>>>>>>>>>>>>>> timer based jobs (that checks all the records in some interval) 
>>>>>>>>>>>>>> or
>>>>>>>>>>>>>> continuous queries that locally filter on the pattern? These 
>>>>>>>>>>>>>> records can
>>>>>>>>>>>>>> get inserted in any order  and some can arrive with delays.
>>>>>>>>>>>>>> An example is to watch for all the records whose timestamp
>>>>>>>>>>>>>> ends in 50, if the timestamp is in the format yyyy-mm-dd hh:mi.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> thanks
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> -
>>>>>>>>>>> Denis
>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to