[ignite] branch ignite-invokeAll updated: invokeAll
This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-invokeAll in repository https://gitbox.apache.org/repos/asf/ignite.git The following commit(s) were added to refs/heads/ignite-invokeAll by this push: new d7389ae invokeAll d7389ae is described below commit d7389ae54a8ea1c533ff2db73758bbad7a569ccd Author: sboikov AuthorDate: Fri Mar 1 15:22:38 2019 +0300 invokeAll --- .../dht/atomic/DhtAtomicUpdateResult.java | 27 --- .../distributed/dht/atomic/GridDhtAtomicCache.java | 200 - .../dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +- 3 files changed, 115 insertions(+), 114 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java index 0aa1676..734a322 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/DhtAtomicUpdateResult.java @@ -51,9 +51,6 @@ class DhtAtomicUpdateResult { */ private int processedEntriesCount; -/** */ -private BitSet retryEntries; - /** * */ @@ -163,28 +160,4 @@ class DhtAtomicUpdateResult { public int processedEntriesCount() { return processedEntriesCount; } - -/** - * @param retryEntries Indexes of entries to retry update for. - */ -@Nullable public BitSet retryEntries() { -return retryEntries; -} - -/** - * @param retryEntries Indexes of entries to retry update for. - */ -public void retryEntries(@Nullable BitSet retryEntries) { -this.retryEntries = retryEntries; -} - -/** - * @param idx Entry index in updated request. - */ -public void addRetryEntry(int idx) { -if (retryEntries == null) -retryEntries = new BitSet(); - -retryEntries.set(idx, true); -} } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 1cd8e64..1fece64 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.io.Externalizable; import java.util.ArrayList; import java.util.Arrays; -import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -28,7 +27,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; import java.util.UUID; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; @@ -1761,14 +1759,6 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { DhtAtomicUpdateResult updDhtRes = new DhtAtomicUpdateResult(); try { -boolean batchStoreUpdate = req.size() > 1 && // Several keys ... -writeThrough() && !req.skipStore() && // and store is enabled ... -!ctx.store().isLocal() && // and this is not local store (conflict resolver should be used for local store) -!ctx.dr().receiveEnabled(); // and no DR. - -Map> byPart = -batchStoreUpdate ? new HashMap<>() : null; - while (true) { try { GridDhtPartitionTopology top = topology(); @@ -1864,7 +1854,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { } } -update(node, locked, req, res, updDhtRes, byPart); +update(node, locked, req, res, updDhtRes); dhtFut = updDhtRes.dhtFuture(); deleted = updDhtRes.deleted(); @@ -1991,8 +1981,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { List locked, GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res, -DhtAtomicUpdateResult dhtUpdRes, -@Nullable Map> byPart +DhtAtomicUpdateResult dhtUpdRes ) throws GridCacheEntryRemovedException { @@ -2027,7 +2016,12 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter {
[ignite] branch ignite-invokeAll updated: invokeAll
This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-invokeAll in repository https://gitbox.apache.org/repos/asf/ignite.git The following commit(s) were added to refs/heads/ignite-invokeAll by this push: new fc3b142 invokeAll fc3b142 is described below commit fc3b14227acdbef8dbeee799a1d430b81b352a8c Author: sboikov AuthorDate: Mon Feb 25 19:41:10 2019 +0300 invokeAll --- .../processors/cache/GridCacheEntryEx.java | 13 +- .../processors/cache/GridCacheMapEntry.java| 120 +-- .../distributed/dht/atomic/GridDhtAtomicCache.java | 389 ++--- .../distributed/near/GridNearAtomicCache.java | 81 +++-- .../processors/cache/GridCacheTestEntryEx.java | 19 +- 5 files changed, 248 insertions(+), 374 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index b93e7f8..85ec6a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -551,28 +551,17 @@ public interface GridCacheEntryEx { */ public GridCacheUpdateAtomicResult innerUpdate( @Nullable GridCacheMapEntry.AtomicCacheUpdateClosure c, -GridCacheVersion ver, +boolean updateOffheap, UUID evtNodeId, UUID affNodeId, GridCacheOperation op, @Nullable Object val, -@Nullable Object[] invokeArgs, -boolean writeThrough, -boolean readThrough, boolean retval, -boolean keepBinary, -@Nullable IgniteCacheExpiryPolicy expiryPlc, boolean evt, boolean metrics, boolean primary, -boolean checkVer, AffinityTopologyVersion topVer, -@Nullable CacheEntryPredicate[] filter, GridDrType drType, -long conflictTtl, -long conflictExpireTime, -@Nullable GridCacheVersion conflictVer, -boolean conflictResolve, boolean intercept, @Nullable UUID subjId, String taskName, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index acd4349..c326924 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -2207,28 +2207,17 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @SuppressWarnings("unchecked") @Override public GridCacheUpdateAtomicResult innerUpdate( AtomicCacheUpdateClosure c, -final GridCacheVersion newVer, +boolean updateStore, final UUID evtNodeId, final UUID affNodeId, final GridCacheOperation op, @Nullable final Object writeObj, -@Nullable final Object[] invokeArgs, -final boolean writeThrough, -final boolean readThrough, final boolean retval, -final boolean keepBinary, -@Nullable final IgniteCacheExpiryPolicy expiryPlc, final boolean evt, final boolean metrics, final boolean primary, -final boolean verCheck, final AffinityTopologyVersion topVer, -@Nullable final CacheEntryPredicate[] filter, final GridDrType drType, -final long explicitTtl, -final long explicitExpireTime, -@Nullable final GridCacheVersion conflictVer, -final boolean conflictResolve, final boolean intercept, @Nullable final UUID subjId, final String taskName, @@ -2255,35 +2244,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Map lsnrs = cctx.continuousQueries().updateListeners(internal, false); boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM -|| !F.isEmptyOrNulls(filter); - -// Possibly read value from store. -boolean readFromStore = readThrough && needVal && (cctx.readThrough() && -(op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue())); - -if (c == null) { -c = new AtomicCacheUpdateClosure(this, -topVer, -newVer, -op, -writeObj, -invokeArgs, -readFromStore, -writeThrough, -keepBinary, -expiryPlc, -primary, -verCheck, -filter, -
[ignite] branch ignite-invokeAll updated: invokeAll
This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-invokeAll in repository https://gitbox.apache.org/repos/asf/ignite.git The following commit(s) were added to refs/heads/ignite-invokeAll by this push: new b0992b3 invokeAll b0992b3 is described below commit b0992b3e103c5ffd9d323126dd132ebced27d97e Author: sboikov AuthorDate: Mon Feb 25 17:32:35 2019 +0300 invokeAll --- .../processors/cache/GridCacheMapEntry.java| 11 +- .../cache/IgniteCacheOffheapManager.java | 7 +- .../cache/IgniteCacheOffheapManagerImpl.java | 27 +- .../distributed/dht/atomic/GridDhtAtomicCache.java | 759 ++--- .../cache/persistence/GridCacheOffheapManager.java | 9 +- .../cache/persistence/tree/BPlusTree.java | 6 +- .../processors/cache/tree/SearchRowEx.java | 45 ++ 7 files changed, 596 insertions(+), 268 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 23b14b6..acd4349 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -2239,7 +2239,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { assert cctx.atomic() && !detached(); -if (!primary && !isNear()) +if (!primary && !isNear() && c == null) ensureFreeSpace(); if (!primary) { @@ -5903,7 +5903,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme private boolean wasIntercepted; /** */ -AtomicCacheUpdateClosure( +public AtomicCacheUpdateClosure( GridCacheMapEntry entry, AffinityTopologyVersion topVer, GridCacheVersion newVer, @@ -5960,6 +5960,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } +/** + * @return Update result. + */ +public GridCacheUpdateAtomicResult updateResult() { +return updateRes; +} + /** {@inheritDoc} */ @Nullable @Override public CacheDataRow oldRow() { return oldRow; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 870d99f..9455c2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.function.Function; import javax.cache.Cache; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; @@ -200,7 +201,7 @@ public interface IgniteCacheOffheapManager { public void invokeAll(GridCacheContext cctx, GridDhtLocalPartition part, Collection rows, -Map map) +Function closures) throws IgniteCheckedException; /** @@ -899,7 +900,7 @@ public interface IgniteCacheOffheapManager { * @return Cache search row. * @throws IgniteCheckedException If failed. */ -public CacheSearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException; +public CacheSearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key, Object data) throws IgniteCheckedException; /** * @return Rows comparator. @@ -915,7 +916,7 @@ public interface IgniteCacheOffheapManager { */ public void invokeAll(GridCacheContext cctx, Collection rows, -Map map) +Function closures) throws IgniteCheckedException; /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 3ae5b81..0a621dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import
[ignite] branch ignite-invokeAll updated: invokeAll
This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-invokeAll in repository https://gitbox.apache.org/repos/asf/ignite.git The following commit(s) were added to refs/heads/ignite-invokeAll by this push: new e3ec453 invokeAll e3ec453 is described below commit e3ec453d3907e7a9ce9d12dc0ac3f4af8725f390 Author: sboikov AuthorDate: Fri Feb 22 16:40:09 2019 +0300 invokeAll --- .../internal/processors/cache/IgniteCacheOffheapManager.java | 9 - 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 7822319..870d99f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -190,6 +190,13 @@ public interface IgniteCacheOffheapManager { public void invoke(GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition part, OffheapInvokeClosure c) throws IgniteCheckedException; +/** + * @param cctx Cache context. + * @param part Partition. + * @param rows Rows to update sorted according to cache tree sort order. + * @param map Update closures map. + * @throws IgniteCheckedException If failed. + */ public void invokeAll(GridCacheContext cctx, GridDhtLocalPartition part, Collection rows, @@ -902,7 +909,7 @@ public interface IgniteCacheOffheapManager { /** * @param cctx Cache context. - * @param rows Rows sorted according to cache tree sort order.. + * @param rows Rows sorted according to cache tree sort order. * @param map Update closures map. * @throws IgniteCheckedException If failed. */
[ignite] branch ignite-invokeAll updated: invokeAll
This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-invokeAll in repository https://gitbox.apache.org/repos/asf/ignite.git The following commit(s) were added to refs/heads/ignite-invokeAll by this push: new 4d21df0 invokeAll 4d21df0 is described below commit 4d21df03398dfffde29ae6e666b07113c3fd2999 Author: sboikov AuthorDate: Fri Feb 22 07:37:43 2019 +0300 invokeAll --- .../cache/distributed/dht/atomic/GridDhtAtomicCache.java | 7 +++ 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 550af11..329165a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -2555,8 +2555,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { GridDrType drType = replicate ? DR_PRIMARY : DR_NONE; -// FIXME size > 1. -if (req.size() >= 1) { +if (req.size() > 1) { Map lsnrs = ctx.continuousQueries().updateListeners(!ctx.userCache(), false); boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM || @@ -2980,8 +2979,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { int cnt = entries.size(); -// FIXME cnt > 1. -Map> byPart = cnt >= 1 ? new HashMap<>() : null; +Map> byPart = +cnt > 1 ? new HashMap<>() : null; // Avoid iterator creation. for (int i = 0; i < cnt; i++) {
[ignite] branch ignite-invokeAll updated: invokeAll
This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-invokeAll in repository https://gitbox.apache.org/repos/asf/ignite.git The following commit(s) were added to refs/heads/ignite-invokeAll by this push: new f3f270c invokeAll f3f270c is described below commit f3f270c75eb6ef6d34fd47f58ed119e44c5a38aa Author: sboikov AuthorDate: Thu Feb 21 22:07:00 2019 +0300 invokeAll --- .../distributed/dht/atomic/NearCacheUpdates.java | 25 ++ .../distributed/near/GridNearAtomicCache.java | 11 -- .../apache/ignite/internal/util/GridLongList.java | 10 + 3 files changed, 30 insertions(+), 16 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearCacheUpdates.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearCacheUpdates.java index fe7e5d1..b698531 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearCacheUpdates.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearCacheUpdates.java @@ -98,28 +98,35 @@ public class NearCacheUpdates implements Message { */ void addNearTtl(int keyIdx, long ttl, long expireTime) { if (ttl >= 0) { -if (nearTtls == null) { +if (nearTtls == null) nearTtls = new GridLongList(16); -for (int i = 0; i < keyIdx; i++) -nearTtls.add(-1L); -} +for (int i = nearTtls.size(); i < keyIdx; i++) +nearTtls.add(-1L); } -if (nearTtls != null) -nearTtls.add(ttl); +if (nearTtls != null) { +if (keyIdx < nearTtls.size()) +nearTtls.set(keyIdx, ttl); +else +nearTtls.add(ttl); +} if (expireTime >= 0) { if (nearExpireTimes == null) { nearExpireTimes = new GridLongList(16); -for (int i = 0; i < keyIdx; i++) +for (int i = nearExpireTimes.size(); i < keyIdx; i++) nearExpireTimes.add(-1); } } -if (nearExpireTimes != null) -nearExpireTimes.add(expireTime); +if (nearExpireTimes != null) { +if (keyIdx < nearExpireTimes.size()) +nearExpireTimes.set(keyIdx, expireTime); +else +nearExpireTimes.add(expireTime); +} } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 833401b..a908afa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -170,14 +170,8 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { int nearValIdx = nearValsIdxs != null ? nearValsIdxs.indexOf(i) : -1; -long ttl = -1; -long expireTime = -1; - -if (nearValIdx >= 0) { +if (nearValIdx >= 0) val = res.nearValue(nearValIdx); -ttl = res.nearTtl(nearValIdx); -expireTime = res.nearExpireTime(nearValIdx); -} else { assert req.operation() != TRANSFORM; @@ -185,6 +179,9 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { val = req.value(i); } +long ttl = res.nearTtl(i); +long expireTime = res.nearExpireTime(i); + if (ttl != CU.TTL_NOT_CHANGED && expireTime == CU.EXPIRE_TIME_CALCULATE) expireTime = CU.toExpireTime(ttl); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java index 1c022b0..d91a6bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java @@ -259,6 +259,16 @@ public class GridLongList implements Message, Externalizable { } /** + * @param i Index. + * @param val Value. + */ +public void set(int i, long val) { +assert i < idx; + +arr[i] = val; +} + +/** * @return Size. */ public int size() {
[ignite] branch ignite-invokeAll updated: invokeAll
This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-invokeAll in repository https://gitbox.apache.org/repos/asf/ignite.git The following commit(s) were added to refs/heads/ignite-invokeAll by this push: new 62648b3 invokeAll 62648b3 is described below commit 62648b32ec6e2cbb39167bc6711f7de2c4560fc0 Author: sboikov AuthorDate: Thu Feb 21 06:06:44 2019 +0300 invokeAll --- .../processors/cache/GridCacheEntryEx.java | 2 + .../processors/cache/GridCacheMapEntry.java| 630 ++--- .../cache/IgniteCacheOffheapManagerImpl.java | 21 +- .../cache/distributed/dht/GridDhtCacheEntry.java | 2 +- .../dht/atomic/DhtAtomicUpdateResult.java | 34 +- .../distributed/dht/atomic/GridDhtAtomicCache.java | 565 +++--- .../distributed/near/GridNearAtomicCache.java | 18 +- .../processors/cache/tree/CacheDataTree.java | 5 + .../processors/cache/GridCacheTestEntryEx.java | 1 + ...eCacheAtomicWithStoreNearEnabledInvokeTest.java | 30 + .../ignite/testsuites/IgniteCacheTestSuite.java| 2 + 11 files changed, 752 insertions(+), 558 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 7b6ab9e..b93e7f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -512,6 +512,7 @@ public interface GridCacheEntryEx { ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** + * @param c Update closure. * @param ver Cache version to set. Entry will be updated only if current version is less then passed version. * @param evtNodeId Event node ID. * @param affNodeId Affinity node ID. @@ -549,6 +550,7 @@ public interface GridCacheEntryEx { * @throws GridCacheEntryRemovedException If entry is obsolete. */ public GridCacheUpdateAtomicResult innerUpdate( +@Nullable GridCacheMapEntry.AtomicCacheUpdateClosure c, GridCacheVersion ver, UUID evtNodeId, UUID affNodeId, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 5124e6d..23b14b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -385,7 +385,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** * @return Local partition that owns this entry. */ -public GridDhtLocalPartition localPartition() { +protected GridDhtLocalPartition localPartition() { return null; } @@ -2203,21 +2203,32 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme invokeRes); } -public GridCacheUpdateAtomicResult finishInnerUpdate( +/** {@inheritDoc} */ +@SuppressWarnings("unchecked") +@Override public GridCacheUpdateAtomicResult innerUpdate( AtomicCacheUpdateClosure c, -Map lsnrs, -boolean needVal, +final GridCacheVersion newVer, final UUID evtNodeId, final UUID affNodeId, final GridCacheOperation op, @Nullable final Object writeObj, @Nullable final Object[] invokeArgs, +final boolean writeThrough, +final boolean readThrough, final boolean retval, +final boolean keepBinary, +@Nullable final IgniteCacheExpiryPolicy expiryPlc, final boolean evt, final boolean metrics, final boolean primary, +final boolean verCheck, final AffinityTopologyVersion topVer, +@Nullable final CacheEntryPredicate[] filter, final GridDrType drType, +final long explicitTtl, +final long explicitExpireTime, +@Nullable final GridCacheVersion conflictVer, +final boolean conflictResolve, final boolean intercept, @Nullable final UUID subjId, final String taskName, @@ -2225,160 +2236,158 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable final Long updateCntr, @Nullable final GridDhtAtomicAbstractUpdateFuture fut, boolean transformOp -) throws GridCacheEntryRemovedException, IgniteCheckedException -{ -assert lock.isHeldByCurrentThread(); - -boolean internal = !cctx.userCache(); - -GridCacheUpdateAtomicResult updateRes = c.updateRes; +) throws IgniteCheckedException,
[ignite] branch ignite-invokeAll updated: invokeAll
This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-invokeAll in repository https://gitbox.apache.org/repos/asf/ignite.git The following commit(s) were added to refs/heads/ignite-invokeAll by this push: new 84b431c invokeAll 84b431c is described below commit 84b431c25ba2e7168f0bc2d4eedaf259e09db564 Author: sboikov AuthorDate: Wed Feb 20 09:23:19 2019 +0300 invokeAll --- .../distributed/dht/atomic/GridDhtAtomicCache.java | 27 ++ .../processors/cache/tree/CacheDataTree.java | 5 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 9c8ae3e..5f694be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -2663,7 +2663,6 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { c.rdrs, c.reqIdx, updRes, -dhtFut, affAssignment, sndPrevVal); } @@ -2733,7 +2732,6 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { readers, i, updRes, -dhtFut, affAssignment, sndPrevVal); } @@ -2756,11 +2754,12 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { GridDhtCacheEntry.ReaderId[] readers, int idx, GridCacheUpdateAtomicResult updRes, -GridDhtAtomicAbstractUpdateFuture dhtFut, AffinityAssignment affAssignment, boolean sndPrevVal ) throws GridCacheEntryRemovedException { +GridDhtAtomicAbstractUpdateFuture dhtFut = dhtUpdRes.dhtFuture(); + GridCacheOperation op = req.operation(); if (dhtFut != null) { @@ -2858,11 +2857,11 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { dhtUpdRes.returnValue(retVal = new GridCacheReturn(nearNode.isLocal())); retVal.addEntryProcessResult(ctx, -entry.key(), -null, -compRes.get1(), -compRes.get2(), -req.keepBinary()); +entry.key(), +null, +compRes.get1(), +compRes.get2(), +req.keepBinary()); } } else { @@ -2870,13 +2869,11 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { if (retVal == null) { CacheObject ret = updRes.oldValue(); -if (retVal == null) { -dhtUpdRes.returnValue(retVal = new GridCacheReturn(ctx, -nearNode.isLocal(), -req.keepBinary(), -req.returnValue() ? ret : null, -updRes.success())); -} +dhtUpdRes.returnValue(new GridCacheReturn(ctx, +nearNode.isLocal(), +req.keepBinary(), +req.returnValue() ? ret : null, +updRes.success())); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java index d2b72ec..947b589 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java @@ -117,6 +117,11 @@ public class CacheDataTree extends BPlusTree { rowsComparator = new Comparator() { @Override public int compare(CacheSearchRow row1, CacheSearchRow row2) { try { +int cmp = Integer.compare(row1.hash(), row2.hash()); + +if (cmp != 0) +return cmp; + return compareKeyBytes( row1.key().valueBytes(grp.cacheObjectContext()), row2.key().valueBytes(grp.cacheObjectContext()));
[ignite] branch ignite-invokeAll updated: invokeAll
This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-invokeAll in repository https://gitbox.apache.org/repos/asf/ignite.git The following commit(s) were added to refs/heads/ignite-invokeAll by this push: new aaf280f invokeAll aaf280f is described below commit aaf280fa8da9704e4d8cda5954c1c64581674364 Author: sboikov AuthorDate: Mon Feb 18 21:53:36 2019 +0300 invokeAll --- .../processors/cache/IgniteCacheOffheapManager.java| 16 .../distributed/dht/atomic/GridDhtAtomicCache.java | 18 +++--- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 4f72e81..7822319 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -886,10 +886,26 @@ public interface IgniteCacheOffheapManager { */ public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException; +/** + * @param cctx Cache context. + * @param key Key. + * @return Cache search row. + * @throws IgniteCheckedException If failed. + */ public CacheSearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException; +/** + * @return Rows comparator. + * @throws IgniteCheckedException If failed. + */ public Comparator rowsComparator() throws IgniteCheckedException; +/** + * @param cctx Cache context. + * @param rows Rows sorted according to cache tree sort order.. + * @param map Update closures map. + * @throws IgniteCheckedException If failed. + */ public void invokeAll(GridCacheContext cctx, Collection rows, Map map) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index fd31b16..9c8ae3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -2551,13 +2551,16 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { int cnt = req.size() - dhtUpdRes.processedEntriesCount(); -Map lsnrs = ctx.continuousQueries().updateListeners(!ctx.userCache(), false); - boolean retval = sndPrevVal || req.returnValue(); GridDrType drType = replicate ? DR_PRIMARY : DR_NONE; -if (cnt > 1) { +if (cnt >= 1) { +Map lsnrs = ctx.continuousQueries().updateListeners(!ctx.userCache(), false); + +boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM || +!F.isEmptyOrNulls(req.filter()); + Map> byPart = new HashMap<>(); for (int i = dhtUpdRes.processedEntriesCount(); i < req.size(); i++) { @@ -2582,6 +2585,10 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer; Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i); +// Possibly read value from store. +boolean readFromStore = !req.skipStore() && needVal && (ctx.readThrough() && +(op == GridCacheOperation.TRANSFORM || ctx.loadPreviousValue())); + AtomicCacheBatchUpdateClosure c = new AtomicCacheBatchUpdateClosure( i, entry, @@ -2590,7 +2597,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { req.operation(), writeVal, req.invokeArguments(), -!req.skipStore(), +readFromStore, writeThrough() && !req.skipStore(), req.keepBinary(), expiry, @@ -2622,9 +2629,6 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { for (Map.Entry e : map.entrySet()) { AtomicCacheBatchUpdateClosure c = e.getValue(); -boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM -