I recalled a complete solution. That's what you would need to do if decide
to process records in real-time with continuous queries in the
*fault-tolerant fashion* (using pseudo-code rather than actual Ignite APIs).

First. You need to add a flag field to the record's class that keeps the
current processing status. Like that:

MyRecord {
int id;
Date created;
byte status; //0 - not processed, 1 - being processed withing a
continuous query filter, 2 - processed by the filter, all the logic
successfully completed
}

Second. The continuous query filter (that will be executed on nodes that
store a copy of a record) needs to have the following structure.

@IgniteAsyncCallback
filterMethod(MyRecords updatedRecord) {

if (isThisNodePrimaryNodeForTheRecord(updatedRecord)) { // execute on a
primary node only
    updatedRecord.status = 1 // setting the flag to signify the processing
is started.

    //execute your business logic

    updatedRecord.status = 2 // finished processing
}
  return false; //you don't want a notification to be sent to the client
application or another node that deployed the continuous query
}

Third. If any node leaves the cluster or the whole cluster is restarted,
then you need to execute your custom for all the records with status=0 or
status=1. To do that you can broadcast a compute task:

// Application side

int[] unprocessedRecords = "select id from MyRecord where status < 2;"

IgniteCompute.affinityRun(idsOfUnprocessedRecords, taskWithMyCustomLogic);
//the task will be executed only on the nodes that store the records

// Server side

taskWithMyCustomLogic() {
    updatedRecord.status = 1 // setting the flag to signify the processing
is started.

    //execute your business logic

    updatedRecord.status = 2 // finished processing
}


That's it. So, the third step already requires you to have a compute task
that would run the calculation in case of failures. Thus, if the real-time
aspect of the processing is not crucial right now, then you can start with
the batch-based approach by running a compute task once at a time and then
introduce the continuous queries-based improvement whenever is needed. You
decide. Hope it helps.


-
Denis


On Wed, Oct 7, 2020 at 9:19 AM Denis Magda <dma...@apache.org> wrote:

> So, a lesson for the future, would the continuous query approach still be
>> preferable if the calculation involves the cache with continuous query and
>> say a look up table? For example, if I want to see the country in the cache
>> employee exists in the list of the countries that I am interested in.
>
>
> You can access other caches from within the filter but the logic has to be
> executed asynchronously to avoid deadlocks:
> https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/lang/IgniteAsyncCallback.html
>
> Also, what do I need to do if I want the filter for the continuous query
>> to execute on the cache on the local node only? Say, I have the continuous
>> query deployed as singleton service on each node, to capture certain
>> changes to a cache on the local node.
>
>
> <https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/lang/IgniteAsyncCallback.html>
>
> The filter will be deployed and executed on every server node. The filter
> is executed only on a server node that owns a record that is being modified
> and passed into a filter. Hmm, it's also said that the filter can be
> executed on a backup node. Check if it's true, and then you need to add a
> special check into the filter that would allow executing the logic only if
> it's the primary node:
>
> https://ignite.apache.org/docs/latest/key-value-api/continuous-queries#remote-filter
>
>
> -
> Denis
>
>
> On Wed, Oct 7, 2020 at 4:39 AM narges saleh <snarges...@gmail.com> wrote:
>
>> Also, what do I need to do if I want the filter for the continuous query
>> to execute on the cache on the local node only? Say, I have the continuous
>> query deployed as singleton service on each node, to capture certain
>> changes to a cache on the local node.
>>
>> On Wed, Oct 7, 2020 at 5:54 AM narges saleh <snarges...@gmail.com> wrote:
>>
>>> Thank you, Denis.
>>> So, a lesson for the future, would the continuous query approach still
>>> be preferable if the calculation involves the cache with continuous query
>>> and say a look up table? For example, if I want to see the country in the
>>> cache employee exists in the list of the countries that I am interested in.
>>>
>>> On Tue, Oct 6, 2020 at 4:11 PM Denis Magda <dma...@apache.org> wrote:
>>>
>>>> Thanks
>>>>
>>>> Then, I would consider the continuous queries based solution as long as
>>>> the records can be updated in real-time:
>>>>
>>>>    - You can process the records on the fly and don't need to come up
>>>>    with any batch task.
>>>>    - The continuous query filter will be executed once on a node that
>>>>    stores the record's primary copy. If the primary node fails in the 
>>>> middle
>>>>    of the filter's calculation execution, then the filter will be executed 
>>>> on
>>>>    a backup node. So, you will not lose any updates but might need to
>>>>    introduce some logic/flag that confirms the calculation is not executed
>>>>    twice for a single record (this can happen if the primary node failed in
>>>>    the middle of the calculation execution and then the backup node picked 
>>>> up
>>>>    and started executing the calculation from scratch).
>>>>    - Updates of other tables or records from within the continuous
>>>>    query filter must go through an async thread pool. You need to use
>>>>    IgniteAsyncCallback annotation for that:
>>>>    
>>>> https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/lang/IgniteAsyncCallback.html
>>>>
>>>> Alternatively, you can always run the calculation in the batch-fashion:
>>>>
>>>>    - Run a compute task once in a while
>>>>    - Read all the latest records that satisfy the requests with SQL or
>>>>    any other APIs
>>>>    - Complete the calculation, mark already processed records just in
>>>>    case if everything is failed in the middle and you need to run the
>>>>    calculation from scratch
>>>>
>>>>
>>>> -
>>>> Denis
>>>>
>>>>
>>>> On Mon, Oct 5, 2020 at 8:33 PM narges saleh <snarges...@gmail.com>
>>>> wrote:
>>>>
>>>>> Denis
>>>>>  The calculation itself doesn't involve an update or read of another
>>>>> record, but based on the outcome of the calculation, the process might 
>>>>> make
>>>>> changes in some other tables.
>>>>>
>>>>> thanks.
>>>>>
>>>>> On Mon, Oct 5, 2020 at 7:04 PM Denis Magda <dma...@apache.org> wrote:
>>>>>
>>>>>> Good. Another clarification:
>>>>>>
>>>>>>    - Does that calculation change the state of the record (updates
>>>>>>    any fields)?
>>>>>>    - Does the calculation read or update any other records?
>>>>>>
>>>>>> -
>>>>>> Denis
>>>>>>
>>>>>>
>>>>>> On Sat, Oct 3, 2020 at 1:34 PM narges saleh <snarges...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> The latter; the server needs to perform some calculations on the
>>>>>>> data without sending any notification to the app.
>>>>>>>
>>>>>>> On Fri, Oct 2, 2020 at 4:25 PM Denis Magda <dma...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> And after you detect a record that satisfies the condition, do you
>>>>>>>> need to send any notification to the application? Or is it more like a
>>>>>>>> server detects and does some calculation logically without updating 
>>>>>>>> the app.
>>>>>>>>
>>>>>>>> -
>>>>>>>> Denis
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 2, 2020 at 11:22 AM narges saleh <snarges...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> The detection should happen at most a couple of minutes after a
>>>>>>>>> record is inserted in the cache but all the detections are local to 
>>>>>>>>> the
>>>>>>>>> node. But some records with the current timestamp might show up in the
>>>>>>>>> system with big delays.
>>>>>>>>>
>>>>>>>>> On Fri, Oct 2, 2020 at 12:23 PM Denis Magda <dma...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> What are your requirements? Do you need to process the records as
>>>>>>>>>> soon as they are put into the cluster?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Friday, October 2, 2020, narges saleh <snarges...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thank you Dennis for the reply.
>>>>>>>>>>> From the perspective of performance/resource overhead and
>>>>>>>>>>> reliability, which approach is preferable? Does a continuous query 
>>>>>>>>>>> based
>>>>>>>>>>> approach impose a lot more overhead?
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 2, 2020 at 9:52 AM Denis Magda <dma...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Narges,
>>>>>>>>>>>>
>>>>>>>>>>>> Use continuous queries if you need to be notified in real-time,
>>>>>>>>>>>> i.e. 1) a record is inserted, 2) the continuous filter confirms the
>>>>>>>>>>>> record's time satisfies your condition, 3) the continuous queries 
>>>>>>>>>>>> notifies
>>>>>>>>>>>> your application that does require processing.
>>>>>>>>>>>>
>>>>>>>>>>>> The jobs are better for a batching use case when it's ok to
>>>>>>>>>>>> process records together with some delay.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -
>>>>>>>>>>>> Denis
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 2, 2020 at 3:50 AM narges saleh <
>>>>>>>>>>>> snarges...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>  If I want to watch for a rolling timestamp pattern in all the
>>>>>>>>>>>>> records that get inserted to all my caches, is it more efficient 
>>>>>>>>>>>>> to use
>>>>>>>>>>>>> timer based jobs (that checks all the records in some interval) or
>>>>>>>>>>>>> continuous queries that locally filter on the pattern? These 
>>>>>>>>>>>>> records can
>>>>>>>>>>>>> get inserted in any order  and some can arrive with delays.
>>>>>>>>>>>>> An example is to watch for all the records whose timestamp
>>>>>>>>>>>>> ends in 50, if the timestamp is in the format yyyy-mm-dd hh:mi.
>>>>>>>>>>>>>
>>>>>>>>>>>>> thanks
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> -
>>>>>>>>>> Denis
>>>>>>>>>>
>>>>>>>>>>

Reply via email to