[ignite] branch ignite-invokeAll updated: invokeAll

2019-03-01 Thread sboikov
This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-invokeAll
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-invokeAll by this push:
 new d7389ae  invokeAll
d7389ae is described below

commit d7389ae54a8ea1c533ff2db73758bbad7a569ccd
Author: sboikov 
AuthorDate: Fri Mar 1 15:22:38 2019 +0300

invokeAll
---
 .../dht/atomic/DhtAtomicUpdateResult.java  |  27 ---
 .../distributed/dht/atomic/GridDhtAtomicCache.java | 200 -
 .../dht/atomic/GridDhtAtomicUpdateFuture.java  |   2 +-
 3 files changed, 115 insertions(+), 114 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java
index 0aa1676..734a322 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java
@@ -51,9 +51,6 @@ class DhtAtomicUpdateResult {
  */
 private int processedEntriesCount;
 
-/** */
-private BitSet retryEntries;
-
 /**
  *
  */
@@ -163,28 +160,4 @@ class DhtAtomicUpdateResult {
 public int processedEntriesCount() {
 return processedEntriesCount;
 }
-
-/**
- * @param retryEntries Indexes of entries to retry update for.
- */
-@Nullable public BitSet retryEntries() {
-return retryEntries;
-}
-
-/**
- * @param retryEntries Indexes of entries to retry update for.
- */
-public void retryEntries(@Nullable BitSet retryEntries) {
-this.retryEntries = retryEntries;
-}
-
-/**
- * @param idx Entry index in updated request.
- */
-public void addRetryEntry(int idx) {
-if (retryEntries == null)
-retryEntries = new BitSet();
-
-retryEntries.set(idx, true);
-}
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 1cd8e64..1fece64 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -20,7 +20,6 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.io.Externalizable;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,7 +27,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
@@ -1761,14 +1759,6 @@ public class GridDhtAtomicCache extends 
GridDhtCacheAdapter {
 DhtAtomicUpdateResult  updDhtRes = new DhtAtomicUpdateResult();
 
 try {
-boolean batchStoreUpdate = req.size() > 1 && // Several keys 
...
-writeThrough() && !req.skipStore() && // and store is 
enabled ...
-!ctx.store().isLocal() && // and this is 
not local store (conflict resolver should be used for local store)
-!ctx.dr().receiveEnabled();   // and no DR.
-
-Map> byPart =
-batchStoreUpdate ? new HashMap<>() : null;
-
 while (true) {
 try {
 GridDhtPartitionTopology top = topology();
@@ -1864,7 +1854,7 @@ public class GridDhtAtomicCache extends 
GridDhtCacheAdapter {
 }
 }
 
-update(node, locked, req, res, updDhtRes, 
byPart);
+update(node, locked, req, res, updDhtRes);
 
 dhtFut = updDhtRes.dhtFuture();
 deleted = updDhtRes.deleted();
@@ -1991,8 +1981,7 @@ public class GridDhtAtomicCache extends 
GridDhtCacheAdapter {
 List locked,
 GridNearAtomicAbstractUpdateRequest req,
 GridNearAtomicUpdateResponse res,
-DhtAtomicUpdateResult dhtUpdRes,
-@Nullable Map> byPart
+DhtAtomicUpdateResult dhtUpdRes
 )
 throws GridCacheEntryRemovedException
 {
@@ -2027,7 +2016,12 @@ public class GridDhtAtomicCache extends 
GridDhtCacheAdapter {
 

[ignite] branch ignite-invokeAll updated: invokeAll

2019-02-25 Thread sboikov
This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-invokeAll
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-invokeAll by this push:
 new fc3b142  invokeAll
fc3b142 is described below

commit fc3b14227acdbef8dbeee799a1d430b81b352a8c
Author: sboikov 
AuthorDate: Mon Feb 25 19:41:10 2019 +0300

invokeAll
---
 .../processors/cache/GridCacheEntryEx.java |  13 +-
 .../processors/cache/GridCacheMapEntry.java| 120 +--
 .../distributed/dht/atomic/GridDhtAtomicCache.java | 389 ++---
 .../distributed/near/GridNearAtomicCache.java  |  81 +++--
 .../processors/cache/GridCacheTestEntryEx.java |  19 +-
 5 files changed, 248 insertions(+), 374 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index b93e7f8..85ec6a5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -551,28 +551,17 @@ public interface GridCacheEntryEx {
  */
 public GridCacheUpdateAtomicResult innerUpdate(
 @Nullable GridCacheMapEntry.AtomicCacheUpdateClosure c,
-GridCacheVersion ver,
+boolean updateOffheap,
 UUID evtNodeId,
 UUID affNodeId,
 GridCacheOperation op,
 @Nullable Object val,
-@Nullable Object[] invokeArgs,
-boolean writeThrough,
-boolean readThrough,
 boolean retval,
-boolean keepBinary,
-@Nullable IgniteCacheExpiryPolicy expiryPlc,
 boolean evt,
 boolean metrics,
 boolean primary,
-boolean checkVer,
 AffinityTopologyVersion topVer,
-@Nullable CacheEntryPredicate[] filter,
 GridDrType drType,
-long conflictTtl,
-long conflictExpireTime,
-@Nullable GridCacheVersion conflictVer,
-boolean conflictResolve,
 boolean intercept,
 @Nullable UUID subjId,
 String taskName,
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index acd4349..c326924 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2207,28 +2207,17 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 @SuppressWarnings("unchecked")
 @Override public GridCacheUpdateAtomicResult innerUpdate(
 AtomicCacheUpdateClosure c,
-final GridCacheVersion newVer,
+boolean updateStore,
 final UUID evtNodeId,
 final UUID affNodeId,
 final GridCacheOperation op,
 @Nullable final Object writeObj,
-@Nullable final Object[] invokeArgs,
-final boolean writeThrough,
-final boolean readThrough,
 final boolean retval,
-final boolean keepBinary,
-@Nullable final IgniteCacheExpiryPolicy expiryPlc,
 final boolean evt,
 final boolean metrics,
 final boolean primary,
-final boolean verCheck,
 final AffinityTopologyVersion topVer,
-@Nullable final CacheEntryPredicate[] filter,
 final GridDrType drType,
-final long explicitTtl,
-final long explicitExpireTime,
-@Nullable final GridCacheVersion conflictVer,
-final boolean conflictResolve,
 final boolean intercept,
 @Nullable final UUID subjId,
 final String taskName,
@@ -2255,35 +2244,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 Map lsnrs = 
cctx.continuousQueries().updateListeners(internal, false);
 
 boolean needVal = lsnrs != null || intercept || retval || op == 
GridCacheOperation.TRANSFORM
-|| !F.isEmptyOrNulls(filter);
-
-// Possibly read value from store.
-boolean readFromStore = readThrough && needVal && 
(cctx.readThrough() &&
-(op == GridCacheOperation.TRANSFORM || 
cctx.loadPreviousValue()));
-
-if (c == null) {
-c = new AtomicCacheUpdateClosure(this,
-topVer,
-newVer,
-op,
-writeObj,
-invokeArgs,
-readFromStore,
-writeThrough,
-keepBinary,
-expiryPlc,
-primary,
-verCheck,
-filter,
- 

[ignite] branch ignite-invokeAll updated: invokeAll

2019-02-25 Thread sboikov
This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-invokeAll
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-invokeAll by this push:
 new b0992b3  invokeAll
b0992b3 is described below

commit b0992b3e103c5ffd9d323126dd132ebced27d97e
Author: sboikov 
AuthorDate: Mon Feb 25 17:32:35 2019 +0300

invokeAll
---
 .../processors/cache/GridCacheMapEntry.java|  11 +-
 .../cache/IgniteCacheOffheapManager.java   |   7 +-
 .../cache/IgniteCacheOffheapManagerImpl.java   |  27 +-
 .../distributed/dht/atomic/GridDhtAtomicCache.java | 759 ++---
 .../cache/persistence/GridCacheOffheapManager.java |   9 +-
 .../cache/persistence/tree/BPlusTree.java  |   6 +-
 .../processors/cache/tree/SearchRowEx.java |  45 ++
 7 files changed, 596 insertions(+), 268 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 23b14b6..acd4349 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2239,7 +2239,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 ) throws IgniteCheckedException, GridCacheEntryRemovedException, 
GridClosureException {
 assert cctx.atomic() && !detached();
 
-if (!primary && !isNear())
+if (!primary && !isNear() && c == null)
 ensureFreeSpace();
 
 if (!primary) {
@@ -5903,7 +5903,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 private boolean wasIntercepted;
 
 /** */
-AtomicCacheUpdateClosure(
+public AtomicCacheUpdateClosure(
 GridCacheMapEntry entry,
 AffinityTopologyVersion topVer,
 GridCacheVersion newVer,
@@ -5960,6 +5960,13 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 }
 }
 
+/**
+ * @return Update result.
+ */
+public GridCacheUpdateAtomicResult updateResult() {
+return updateRes;
+}
+
 /** {@inheritDoc} */
 @Nullable @Override public CacheDataRow oldRow() {
 return oldRow;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 870d99f..9455c2a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 import javax.cache.Cache;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
@@ -200,7 +201,7 @@ public interface IgniteCacheOffheapManager {
 public void invokeAll(GridCacheContext cctx,
 GridDhtLocalPartition part,
 Collection rows,
-Map map)
+Function closures)
 throws IgniteCheckedException;
 
 /**
@@ -899,7 +900,7 @@ public interface IgniteCacheOffheapManager {
  * @return Cache search row.
  * @throws IgniteCheckedException If failed.
  */
-public CacheSearchRow createSearchRow(GridCacheContext cctx, 
KeyCacheObject key) throws IgniteCheckedException;
+public CacheSearchRow createSearchRow(GridCacheContext cctx, 
KeyCacheObject key, Object data) throws IgniteCheckedException;
 
 /**
  * @return Rows comparator.
@@ -915,7 +916,7 @@ public interface IgniteCacheOffheapManager {
  */
 public void invokeAll(GridCacheContext cctx,
 Collection rows,
-Map map)
+Function closures)
 throws IgniteCheckedException;
 
 /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 3ae5b81..0a621dc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import 

[ignite] branch ignite-invokeAll updated: invokeAll

2019-02-22 Thread sboikov
This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-invokeAll
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-invokeAll by this push:
 new e3ec453  invokeAll
e3ec453 is described below

commit e3ec453d3907e7a9ce9d12dc0ac3f4af8725f390
Author: sboikov 
AuthorDate: Fri Feb 22 16:40:09 2019 +0300

invokeAll
---
 .../internal/processors/cache/IgniteCacheOffheapManager.java | 9 -
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 7822319..870d99f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -190,6 +190,13 @@ public interface IgniteCacheOffheapManager {
 public void invoke(GridCacheContext cctx, KeyCacheObject key, 
GridDhtLocalPartition part, OffheapInvokeClosure c)
 throws IgniteCheckedException;
 
+/**
+ * @param cctx Cache context.
+ * @param part Partition.
+ * @param rows Rows to update sorted according to cache tree sort order.
+ * @param map Update closures map.
+ * @throws IgniteCheckedException If failed.
+ */
 public void invokeAll(GridCacheContext cctx,
 GridDhtLocalPartition part,
 Collection rows,
@@ -902,7 +909,7 @@ public interface IgniteCacheOffheapManager {
 
 /**
  * @param cctx Cache context.
- * @param rows Rows sorted according to cache tree sort order..
+ * @param rows Rows sorted according to cache tree sort order.
  * @param map Update closures map.
  * @throws IgniteCheckedException If failed.
  */



[ignite] branch ignite-invokeAll updated: invokeAll

2019-02-21 Thread sboikov
This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-invokeAll
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-invokeAll by this push:
 new 4d21df0  invokeAll
4d21df0 is described below

commit 4d21df03398dfffde29ae6e666b07113c3fd2999
Author: sboikov 
AuthorDate: Fri Feb 22 07:37:43 2019 +0300

invokeAll
---
 .../cache/distributed/dht/atomic/GridDhtAtomicCache.java   | 7 +++
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 550af11..329165a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2555,8 +2555,7 @@ public class GridDhtAtomicCache extends 
GridDhtCacheAdapter {
 
 GridDrType drType = replicate ? DR_PRIMARY : DR_NONE;
 
-// FIXME size > 1.
-if (req.size() >= 1) {
+if (req.size() > 1) {
 Map lsnrs = 
ctx.continuousQueries().updateListeners(!ctx.userCache(), false);
 
 boolean needVal = lsnrs != null || intercept || retval || op == 
GridCacheOperation.TRANSFORM ||
@@ -2980,8 +2979,8 @@ public class GridDhtAtomicCache extends 
GridDhtCacheAdapter {
 
 int cnt = entries.size();
 
-// FIXME cnt > 1.
-Map> byPart = cnt >= 1 ? new HashMap<>() : null;
+Map> byPart =
+cnt > 1 ? new HashMap<>() : null;
 
 // Avoid iterator creation.
 for (int i = 0; i < cnt; i++) {



[ignite] branch ignite-invokeAll updated: invokeAll

2019-02-21 Thread sboikov
This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-invokeAll
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-invokeAll by this push:
 new f3f270c  invokeAll
f3f270c is described below

commit f3f270c75eb6ef6d34fd47f58ed119e44c5a38aa
Author: sboikov 
AuthorDate: Thu Feb 21 22:07:00 2019 +0300

invokeAll
---
 .../distributed/dht/atomic/NearCacheUpdates.java   | 25 ++
 .../distributed/near/GridNearAtomicCache.java  | 11 --
 .../apache/ignite/internal/util/GridLongList.java  | 10 +
 3 files changed, 30 insertions(+), 16 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearCacheUpdates.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearCacheUpdates.java
index fe7e5d1..b698531 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearCacheUpdates.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearCacheUpdates.java
@@ -98,28 +98,35 @@ public class NearCacheUpdates implements Message {
  */
 void addNearTtl(int keyIdx, long ttl, long expireTime) {
 if (ttl >= 0) {
-if (nearTtls == null) {
+if (nearTtls == null)
 nearTtls = new GridLongList(16);
 
-for (int i = 0; i < keyIdx; i++)
-nearTtls.add(-1L);
-}
+for (int i = nearTtls.size(); i < keyIdx; i++)
+nearTtls.add(-1L);
 }
 
-if (nearTtls != null)
-nearTtls.add(ttl);
+if (nearTtls != null) {
+if (keyIdx < nearTtls.size())
+nearTtls.set(keyIdx, ttl);
+else
+nearTtls.add(ttl);
+}
 
 if (expireTime >= 0) {
 if (nearExpireTimes == null) {
 nearExpireTimes = new GridLongList(16);
 
-for (int i = 0; i < keyIdx; i++)
+for (int i = nearExpireTimes.size(); i < keyIdx; i++)
 nearExpireTimes.add(-1);
 }
 }
 
-if (nearExpireTimes != null)
-nearExpireTimes.add(expireTime);
+if (nearExpireTimes != null) {
+if (keyIdx < nearExpireTimes.size())
+nearExpireTimes.set(keyIdx, expireTime);
+else
+nearExpireTimes.add(expireTime);
+}
 }
 
 /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 833401b..a908afa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -170,14 +170,8 @@ public class GridNearAtomicCache extends 
GridNearCacheAdapter {
 
 int nearValIdx = nearValsIdxs != null ? nearValsIdxs.indexOf(i) : 
-1;
 
-long ttl = -1;
-long expireTime = -1;
-
-if (nearValIdx >= 0) {
+if (nearValIdx >= 0)
 val = res.nearValue(nearValIdx);
-ttl = res.nearTtl(nearValIdx);
-expireTime = res.nearExpireTime(nearValIdx);
-}
 else {
 assert req.operation() != TRANSFORM;
 
@@ -185,6 +179,9 @@ public class GridNearAtomicCache extends 
GridNearCacheAdapter {
 val = req.value(i);
 }
 
+long ttl = res.nearTtl(i);
+long expireTime = res.nearExpireTime(i);
+
 if (ttl != CU.TTL_NOT_CHANGED && expireTime == 
CU.EXPIRE_TIME_CALCULATE)
 expireTime = CU.toExpireTime(ttl);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java
index 1c022b0..d91a6bb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java
@@ -259,6 +259,16 @@ public class GridLongList implements Message, 
Externalizable {
 }
 
 /**
+ * @param i Index.
+ * @param val Value.
+ */
+public void set(int i, long val) {
+assert i < idx;
+
+arr[i] = val;
+}
+
+/**
  * @return Size.
  */
 public int size() {



[ignite] branch ignite-invokeAll updated: invokeAll

2019-02-21 Thread sboikov
This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-invokeAll
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-invokeAll by this push:
 new 62648b3  invokeAll
62648b3 is described below

commit 62648b32ec6e2cbb39167bc6711f7de2c4560fc0
Author: sboikov 
AuthorDate: Thu Feb 21 06:06:44 2019 +0300

invokeAll
---
 .../processors/cache/GridCacheEntryEx.java |   2 +
 .../processors/cache/GridCacheMapEntry.java| 630 ++---
 .../cache/IgniteCacheOffheapManagerImpl.java   |  21 +-
 .../cache/distributed/dht/GridDhtCacheEntry.java   |   2 +-
 .../dht/atomic/DhtAtomicUpdateResult.java  |  34 +-
 .../distributed/dht/atomic/GridDhtAtomicCache.java | 565 +++---
 .../distributed/near/GridNearAtomicCache.java  |  18 +-
 .../processors/cache/tree/CacheDataTree.java   |   5 +
 .../processors/cache/GridCacheTestEntryEx.java |   1 +
 ...eCacheAtomicWithStoreNearEnabledInvokeTest.java |  30 +
 .../ignite/testsuites/IgniteCacheTestSuite.java|   2 +
 11 files changed, 752 insertions(+), 558 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 7b6ab9e..b93e7f8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -512,6 +512,7 @@ public interface GridCacheEntryEx {
 ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
 /**
+ * @param c Update closure.
  * @param ver Cache version to set. Entry will be updated only if current 
version is less then passed version.
  * @param evtNodeId Event node ID.
  * @param affNodeId Affinity node ID.
@@ -549,6 +550,7 @@ public interface GridCacheEntryEx {
  * @throws GridCacheEntryRemovedException If entry is obsolete.
  */
 public GridCacheUpdateAtomicResult innerUpdate(
+@Nullable GridCacheMapEntry.AtomicCacheUpdateClosure c,
 GridCacheVersion ver,
 UUID evtNodeId,
 UUID affNodeId,
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 5124e6d..23b14b6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -385,7 +385,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 /**
  * @return Local partition that owns this entry.
  */
-public GridDhtLocalPartition localPartition() {
+protected GridDhtLocalPartition localPartition() {
 return null;
 }
 
@@ -2203,21 +2203,32 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 invokeRes);
 }
 
-public GridCacheUpdateAtomicResult finishInnerUpdate(
+/** {@inheritDoc} */
+@SuppressWarnings("unchecked")
+@Override public GridCacheUpdateAtomicResult innerUpdate(
 AtomicCacheUpdateClosure c,
-Map lsnrs,
-boolean needVal,
+final GridCacheVersion newVer,
 final UUID evtNodeId,
 final UUID affNodeId,
 final GridCacheOperation op,
 @Nullable final Object writeObj,
 @Nullable final Object[] invokeArgs,
+final boolean writeThrough,
+final boolean readThrough,
 final boolean retval,
+final boolean keepBinary,
+@Nullable final IgniteCacheExpiryPolicy expiryPlc,
 final boolean evt,
 final boolean metrics,
 final boolean primary,
+final boolean verCheck,
 final AffinityTopologyVersion topVer,
+@Nullable final CacheEntryPredicate[] filter,
 final GridDrType drType,
+final long explicitTtl,
+final long explicitExpireTime,
+@Nullable final GridCacheVersion conflictVer,
+final boolean conflictResolve,
 final boolean intercept,
 @Nullable final UUID subjId,
 final String taskName,
@@ -2225,160 +2236,158 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 @Nullable final Long updateCntr,
 @Nullable final GridDhtAtomicAbstractUpdateFuture fut,
 boolean transformOp
-) throws GridCacheEntryRemovedException, IgniteCheckedException
-{
-assert lock.isHeldByCurrentThread();
-
-boolean internal = !cctx.userCache();
-
-GridCacheUpdateAtomicResult updateRes = c.updateRes;
+) throws IgniteCheckedException, 

[ignite] branch ignite-invokeAll updated: invokeAll

2019-02-19 Thread sboikov
This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-invokeAll
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-invokeAll by this push:
 new 84b431c  invokeAll
84b431c is described below

commit 84b431c25ba2e7168f0bc2d4eedaf259e09db564
Author: sboikov 
AuthorDate: Wed Feb 20 09:23:19 2019 +0300

invokeAll
---
 .../distributed/dht/atomic/GridDhtAtomicCache.java | 27 ++
 .../processors/cache/tree/CacheDataTree.java   |  5 
 2 files changed, 17 insertions(+), 15 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 9c8ae3e..5f694be 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2663,7 +2663,6 @@ public class GridDhtAtomicCache extends 
GridDhtCacheAdapter {
 c.rdrs,
 c.reqIdx,
 updRes,
-dhtFut,
 affAssignment,
 sndPrevVal);
 }
@@ -2733,7 +2732,6 @@ public class GridDhtAtomicCache extends 
GridDhtCacheAdapter {
 readers,
 i,
 updRes,
-dhtFut,
 affAssignment,
 sndPrevVal);
 }
@@ -2756,11 +2754,12 @@ public class GridDhtAtomicCache extends 
GridDhtCacheAdapter {
 GridDhtCacheEntry.ReaderId[] readers,
 int idx,
 GridCacheUpdateAtomicResult updRes,
-GridDhtAtomicAbstractUpdateFuture dhtFut,
 AffinityAssignment affAssignment,
 boolean sndPrevVal
 ) throws GridCacheEntryRemovedException
 {
+GridDhtAtomicAbstractUpdateFuture dhtFut = dhtUpdRes.dhtFuture();
+
 GridCacheOperation op = req.operation();
 
 if (dhtFut != null) {
@@ -2858,11 +2857,11 @@ public class GridDhtAtomicCache extends 
GridDhtCacheAdapter {
 dhtUpdRes.returnValue(retVal = new 
GridCacheReturn(nearNode.isLocal()));
 
 retVal.addEntryProcessResult(ctx,
-entry.key(),
-null,
-compRes.get1(),
-compRes.get2(),
-req.keepBinary());
+entry.key(),
+null,
+compRes.get1(),
+compRes.get2(),
+req.keepBinary());
 }
 }
 else {
@@ -2870,13 +2869,11 @@ public class GridDhtAtomicCache extends 
GridDhtCacheAdapter {
 if (retVal == null) {
 CacheObject ret = updRes.oldValue();
 
-if (retVal == null) {
-dhtUpdRes.returnValue(retVal = new GridCacheReturn(ctx,
-nearNode.isLocal(),
-req.keepBinary(),
-req.returnValue() ? ret : null,
-updRes.success()));
-}
+dhtUpdRes.returnValue(new GridCacheReturn(ctx,
+nearNode.isLocal(),
+req.keepBinary(),
+req.returnValue() ? ret : null,
+updRes.success()));
 }
 }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index d2b72ec..947b589 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -117,6 +117,11 @@ public class CacheDataTree extends 
BPlusTree {
 rowsComparator = new Comparator() {
 @Override public int compare(CacheSearchRow row1, CacheSearchRow 
row2) {
 try {
+int cmp = Integer.compare(row1.hash(), row2.hash());
+
+if (cmp != 0)
+return cmp;
+
 return compareKeyBytes(
 row1.key().valueBytes(grp.cacheObjectContext()),
 row2.key().valueBytes(grp.cacheObjectContext()));



[ignite] branch ignite-invokeAll updated: invokeAll

2019-02-18 Thread sboikov
This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-invokeAll
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-invokeAll by this push:
 new aaf280f  invokeAll
aaf280f is described below

commit aaf280fa8da9704e4d8cda5954c1c64581674364
Author: sboikov 
AuthorDate: Mon Feb 18 21:53:36 2019 +0300

invokeAll
---
 .../processors/cache/IgniteCacheOffheapManager.java| 16 
 .../distributed/dht/atomic/GridDhtAtomicCache.java | 18 +++---
 2 files changed, 27 insertions(+), 7 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 4f72e81..7822319 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -886,10 +886,26 @@ public interface IgniteCacheOffheapManager {
  */
 public void invoke(GridCacheContext cctx, KeyCacheObject key, 
OffheapInvokeClosure c) throws IgniteCheckedException;
 
+/**
+ * @param cctx Cache context.
+ * @param key Key.
+ * @return Cache search row.
+ * @throws IgniteCheckedException If failed.
+ */
 public CacheSearchRow createSearchRow(GridCacheContext cctx, 
KeyCacheObject key) throws IgniteCheckedException;
 
+/**
+ * @return Rows comparator.
+ * @throws IgniteCheckedException If failed.
+ */
 public Comparator rowsComparator() throws 
IgniteCheckedException;
 
+/**
+ * @param cctx Cache context.
+ * @param rows Rows sorted according to cache tree sort order..
+ * @param map Update closures map.
+ * @throws IgniteCheckedException If failed.
+ */
 public void invokeAll(GridCacheContext cctx,
 Collection rows,
 Map map)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index fd31b16..9c8ae3e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2551,13 +2551,16 @@ public class GridDhtAtomicCache extends 
GridDhtCacheAdapter {
 
 int cnt = req.size() - dhtUpdRes.processedEntriesCount();
 
-Map lsnrs = 
ctx.continuousQueries().updateListeners(!ctx.userCache(), false);
-
 boolean retval = sndPrevVal || req.returnValue();
 
 GridDrType drType = replicate ? DR_PRIMARY : DR_NONE;
 
-if (cnt > 1) {
+if (cnt >= 1) {
+Map lsnrs = 
ctx.continuousQueries().updateListeners(!ctx.userCache(), false);
+
+boolean needVal = lsnrs != null || intercept || retval || op == 
GridCacheOperation.TRANSFORM ||
+!F.isEmptyOrNulls(req.filter());
+
 Map> byPart = new HashMap<>();
 
 for (int i = dhtUpdRes.processedEntriesCount(); i < req.size(); 
i++) {
@@ -2582,6 +2585,10 @@ public class GridDhtAtomicCache extends 
GridDhtCacheAdapter {
 assert !(newConflictVer instanceof GridCacheVersionEx) : 
newConflictVer;
 Object writeVal = op == TRANSFORM ? req.entryProcessor(i) 
: req.writeValue(i);
 
+// Possibly read value from store.
+boolean readFromStore = !req.skipStore() && needVal && 
(ctx.readThrough() &&
+(op == GridCacheOperation.TRANSFORM || 
ctx.loadPreviousValue()));
+
 AtomicCacheBatchUpdateClosure c = new 
AtomicCacheBatchUpdateClosure(
 i,
 entry,
@@ -2590,7 +2597,7 @@ public class GridDhtAtomicCache extends 
GridDhtCacheAdapter {
 req.operation(),
 writeVal,
 req.invokeArguments(),
-!req.skipStore(),
+readFromStore,
 writeThrough() && !req.skipStore(),
 req.keepBinary(),
 expiry,
@@ -2622,9 +2629,6 @@ public class GridDhtAtomicCache extends 
GridDhtCacheAdapter {
 for (Map.Entry e : map.entrySet()) {
 AtomicCacheBatchUpdateClosure c = e.getValue();
 
-boolean needVal = lsnrs != null || intercept || retval 
|| op == GridCacheOperation.TRANSFORM
-