Hi,

The fact that code from invoke(...) is executed on node that initiated
transaction ("near node" in ignite terminology) is a known issue. There is
a ticket for it (https://issues.apache.org/jira/browse/IGNITE-3471), but it
hasn't been fixed yet.

To solve your initial goal, you might want to start transaction on the
primary node for your key. It can be achieved by using
ignite.compute().affinityRun(...), but in this case you have to start
transaction inside affinityRun closure.

Like this:
ignite.compute().affinityRun(cacheName, key,
            () -> {
                try (Transaction tx =
Ignition.ignite().transactions().txStart(...)) {
                    cache.invoke(key, entryProcessor);

                    tx.commit();
                }
            }
    );
}

In this case you will minimize overhead to modify entry in cache -
entryProcessor will be executed only on nodes that own the key, and stored
value shouldn't be transferred between nodes at all.

Hope this helps.



On Mon, Feb 12, 2018 at 1:57 PM, Prasad Bhalerao <
prasadbhalerao1...@gmail.com> wrote:

> Hi,
>
> I am trying to test the distributed transaction support using following
> piece of code. While debugging the code I observed that code executes on
> client node first and after doing commit the code executes on a node which
> owns that kay.
>
> What I am trying to do is, to collocate the data to avoid the network call
> as my data in real use case is going to big. But while debugging the code,
> I observed that entry processor first executes on client node, gets all the
> data executes the task. and after commit executes the same code on remote
> node.
>
> Can someone please explain this behavior? My use case to execute the task
> on nodes which owns the data in single transaction.
>
> private static void executeEntryProcessorTransaction(IgniteCache<Long,
> Person> cache) {
>     Person val=null;
>     try (Transaction tx = Ignition.ignite().transactions().txStart(
> TransactionConcurrency.OPTIMISTIC,TransactionIsolation.SERIALIZABLE)) {
>   long myid =6l;
>         CacheEntryProcessor entryProcessor = new MyEntryProcessor();
>         cache.invoke(myid, entryProcessor);
>         System.out.println("Overwrote old value: " + val);
>         val = cache.get(myid);
>         System.out.println("Read value: " + val);
>
>         tx.commit();
>         System.out.println("Read value after commit: " +
> cache.get(myid));
>     }
> }
>
>
>
> Thanks,
> Prasad
>



-- 
Best regards,
Ilya

Reply via email to