Repository: ignite Updated Branches: refs/heads/ignite-gg-10837 93656982b -> 880435751
IGNITE-GG-10837 Fixed review notes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/88043575 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/88043575 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/88043575 Branch: refs/heads/ignite-gg-10837 Commit: 88043575166c1a54e19c6a4eae8741bc626cad8a Parents: 9365698 Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Thu Dec 17 13:28:16 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Thu Dec 17 13:28:16 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 17 ----- .../processors/cache/GridCacheProxyImpl.java | 6 -- .../processors/cache/IgniteCacheProxy.java | 72 ++++++++++++++++++++ .../processors/cache/IgniteInternalCache.java | 11 --- .../dht/atomic/GridDhtAtomicCache.java | 49 +++++++------ .../distributed/near/GridNearAtomicCache.java | 6 -- .../cache/dr/GridCacheDrExpirationInfo.java | 15 ---- .../transactions/IgniteTxLocalAdapter.java | 48 +++++-------- .../cache/version/GridCacheVersionManager.java | 18 ++--- 9 files changed, 121 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/88043575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 914797f..a8769b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2094,23 +2094,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> invokeAllConflictAsync(final Map<KeyCacheObject, GridCacheDrInfo> map, - final Object... args) throws IgniteCheckedException { - if (F.isEmpty(map)) - return new GridFinishedFuture<Object>(); - - return asyncOp(new AsyncInOp(map.keySet()) { - @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { - return tx.invokeAllDrAsync(ctx, map, args); - } - - @Override public String toString() { - return "invokeAllDrAsync [drMap=" + map + ']'; - } - }); - } - - /** {@inheritDoc} */ @Override public <T> EntryProcessorResult<T> invoke(final K key, final EntryProcessor<K, V, T> entryProcessor, final Object... args) http://git-wip-us.apache.org/repos/asf/ignite/blob/88043575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 3753a9f..1f760e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -536,12 +536,6 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> invokeAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> map, - Object... args) throws IgniteCheckedException { - return null; - } - - /** {@inheritDoc} */ @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/88043575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 1768ecf..44c87a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -64,8 +64,10 @@ import org.apache.ignite.internal.AsyncSupportAdapter; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.query.CacheQuery; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.GridEmptyIterator; @@ -1079,6 +1081,76 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } } + /** + * Put entries to cache with conflict resolution. + * + * @param conflictMap Conflict map. + */ + public void putAllConflict(Map<KeyCacheObject, GridCacheDrInfo> conflictMap) { + try { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.putAllConflict(conflictMap); + } + finally { + onLeave(gate, prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** + * Remove entries from cache with conflict resolution. + * + * @param conflictMap Conflict map. + */ + public void removeAllConflict(Map<KeyCacheObject, GridCacheVersion> conflictMap) { + try { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.removeAllConflict(conflictMap); + } + finally { + onLeave(gate, prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** + * Invoke entries from cache with conflict resolution. + * + * @param map Conflict map. + * @param args Invoke arguments. + */ + public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> map, Object... args) { + try { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.invokeAllConflict(map, args); + } + finally { + onLeave(gate, prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + /** {@inheritDoc} */ @Override public V getAndPut(K key, V val) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/88043575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 61dc13b..3c72172 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -1568,17 +1568,6 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { throws IgniteCheckedException; /** - * Invoke async with conflict resolution. - * - * @param map Map containing keys and entry processors to be applied to values. - * @param args Arguments. - * @return Invoke results. - * @throws IgniteCheckedException If failed. - */ - public IgniteInternalFuture<?> invokeAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> map, Object... args) - throws IgniteCheckedException; - - /** * Removes DR data. * * @param drMap DR map. http://git-wip-us.apache.org/repos/asf/ignite/blob/88043575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 4f2f8f7..73070ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -421,7 +421,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, false, filter, - true); + true, + UPDATE); } /** {@inheritDoc} */ @@ -437,7 +438,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, filter, - true); + true, + UPDATE); } /** {@inheritDoc} */ @@ -452,7 +454,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, false, ctx.noValArray(), - false).get(); + false, + UPDATE).get(); } /** {@inheritDoc} */ @@ -544,7 +547,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, true, ctx.equalsValArray(oldVal), - true); + true, + UPDATE); } /** {@inheritDoc} */ @@ -562,7 +566,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, CU.empty0(), - true).chain(RET2NULL); + true, + UPDATE).chain(RET2NULL); } /** {@inheritDoc} */ @@ -583,20 +588,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, null, - true); + true, + UPDATE); } /** {@inheritDoc} */ - public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> map, Object... args) + public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> conflictMap, Object... args) throws IgniteCheckedException { - invokeAllConflictAsync(map, args).get(); - } - - /** {@inheritDoc} */ - public IgniteInternalFuture<?> invokeAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> conflictMap, - Object... args) throws IgniteCheckedException { - - return updateAllAsync0(null, + updateAllAsync0(null, null, args, conflictMap, @@ -604,7 +603,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, null, - true); + true, + TRANSFORM).get(); } /** {@inheritDoc} */ @@ -784,7 +784,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, null, - true); + true, + TRANSFORM); return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() { @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) @@ -840,7 +841,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, null, - true); + true, + TRANSFORM); return resFut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, Map<K, EntryProcessorResult<T>>>() { @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) throws IgniteCheckedException { @@ -876,7 +878,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, false, null, - true); + true, + TRANSFORM); } /** @@ -903,7 +906,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final boolean retval, final boolean rawRetval, @Nullable final CacheEntryPredicate[] filter, - final boolean waitTopFut + final boolean waitTopFut, + final GridCacheOperation op ) { assert ctx.updatesAllowed(); @@ -918,11 +922,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { int taskNameHash = ctx.kernalContext().job().currentTaskNameHash(); - GridCacheOperation op = invokeMap != null ? TRANSFORM : UPDATE; - - if (op == UPDATE && conflictPutMap != null && !conflictPutMap.isEmpty()) - op = F.firstEntry(conflictPutMap).getValue().entryProcessor() != null ? TRANSFORM : UPDATE; - final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture( ctx, this, http://git-wip-us.apache.org/repos/asf/ignite/blob/88043575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 335266d..a3f398e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -551,12 +551,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> invokeAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> map, - Object... args) throws IgniteCheckedException { - return dht.invokeAllConflictAsync(map, args); - } - - /** {@inheritDoc} */ @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/88043575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java index c5f645f..976dea2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java @@ -57,21 +57,6 @@ public class GridCacheDrExpirationInfo extends GridCacheDrInfo { this.expireTime = expireTime; } - /** - * Constructor. - * - * @param proc Entry processor. - * @param ver Version. - * @param ttl TTL. - * @param expireTime Expire time. - */ - public GridCacheDrExpirationInfo(EntryProcessor proc, GridCacheVersion ver, long ttl, long expireTime) { - super(proc, ver); - - this.ttl = ttl; - this.expireTime = expireTime; - } - /** {@inheritDoc} */ @Override public long ttl() { return ttl; http://git-wip-us.apache.org/repos/asf/ignite/blob/88043575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 8e7069d..9001fb5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1969,8 +1969,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter GridCacheContext cacheCtx, Map<KeyCacheObject, GridCacheDrInfo> drMap ) { + Map<KeyCacheObject, Object> map = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() { + @Override public Object apply(GridCacheDrInfo val) { + return val.value(); + } + }); + return this.<Object, Object>putAllAsync0(cacheCtx, - null, + map, null, null, drMap, @@ -1983,15 +1989,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter Map<KeyCacheObject, GridCacheDrInfo> drMap, Object... args ) { - Map<Object, EntryProcessor<Object, Object, Object>> invoke = new LinkedHashMap<>(); - - for (Map.Entry<KeyCacheObject, GridCacheDrInfo> e : drMap.entrySet()) - if (e.getValue().entryProcessor() != null) - invoke.put(e.getKey(), e.getValue().entryProcessor()); + Map<Object, EntryProcessor<Object, Object, Object>> invokeMap = + F.viewReadOnly(drMap, (IgniteClosure)new IgniteClosure<Object, EntryProcessor<Object, Object, Object>>() { + @Override public EntryProcessor<Object, Object, Object> apply(Object val) { + return (EntryProcessor<Object, Object, Object>)((GridCacheDrInfo)val).entryProcessor(); + } + }); return this.<Object, Object>putAllAsync0(cacheCtx, null, - invoke, + invokeMap, args, drMap, true, @@ -3076,31 +3083,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } // Cached entry may be passed only from entry wrapper. - final Map<?, ?> map0; - final Map<?, EntryProcessor<K, V, Object>> invokeMap0; - - if (drMap != null) { - assert map == null; - - if (invokeMap == null) { - map0 = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() { - @Override public Object apply(GridCacheDrInfo val) { - return val.value(); - } - }); - - invokeMap0 = null; - } - else { - map0 = null; - - invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap; - } - } - else { - map0 = map; - invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap; - } + final Map<?, ?> map0 = map; + final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap; if (log.isDebugEnabled()) log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]"); http://git-wip-us.apache.org/repos/asf/ignite/blob/88043575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index b5fc4ca..166c713 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -176,7 +176,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version based on current topology. */ public GridCacheVersion next() { - return next(cctx.kernalContext().discovery().topologyVersion(), true, false, null); + return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId); } /** @@ -196,7 +196,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version based on given topology version. */ public GridCacheVersion next(AffinityTopologyVersion topVer) { - return next(topVer.topologyVersion(), true, false, null); + return next(topVer.topologyVersion(), true, false, dataCenterId); } /** @@ -205,7 +205,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version for cache store operations. */ public GridCacheVersion nextForLoad() { - return next(cctx.kernalContext().discovery().topologyVersion(), true, true, null); + return next(cctx.kernalContext().discovery().topologyVersion(), true, true, dataCenterId); } /** @@ -214,7 +214,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version for cache store operations. */ public GridCacheVersion nextForLoad(AffinityTopologyVersion topVer) { - return next(topVer.topologyVersion(), true, true, null); + return next(topVer.topologyVersion(), true, true, dataCenterId); } /** @@ -223,7 +223,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version for cache store operations. */ public GridCacheVersion nextForLoad(GridCacheVersion ver) { - return next(ver.topologyVersion(), false, true, null); + return next(ver.topologyVersion(), false, true, dataCenterId); } /** @@ -233,7 +233,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Next version based on given cache version. */ public GridCacheVersion next(GridCacheVersion ver) { - return next(ver.topologyVersion(), false, false, null); + return next(ver.topologyVersion(), false, false, dataCenterId); } /** @@ -245,10 +245,10 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @param topVer Topology version for which new version should be obtained. * @param addTime If {@code true} then adds to the given topology version number of seconds * from the start time of the first grid node. - * @param dataCenterId0 Data center id. + * @param dataCenterId Data center id. * @return New lock order. */ - private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad, Byte dataCenterId0) { + private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad, byte dataCenterId) { if (topVer == -1) topVer = cctx.kernalContext().discovery().topologyVersion(); @@ -270,7 +270,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { globalTime, ord, locNodeOrder, - dataCenterId0 == null ? dataCenterId : dataCenterId0); + dataCenterId); last = next;