http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index b8c78bd..9493510 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -62,7 +62,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; -import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTrackerImpl; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -72,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TransactionProxy import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyRollbackOnlyImpl; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.query.EnlistOperation; import org.apache.ignite.internal.processors.query.UpdateSourceIterator; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; @@ -95,6 +95,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; @@ -191,11 +192,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** */ private MvccQueryTracker mvccTracker; - /** Whether this transaction is for SQL operations or not.<p> + /** Whether this is Mvcc transaction or not.<p> * {@code null} means there haven't been any calls made on this transaction, and first operation will give this * field actual value. */ - private Boolean sql; + private Boolean mvccOp; /** * Empty constructor required for {@link Externalizable}. @@ -205,7 +206,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } /** - * @param ctx Cache registry. + * @param ctx Cache registry. * @param implicit Implicit flag. * @param implicitSingle Implicit with one key flag. * @param sys System flag. @@ -214,7 +215,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou * @param isolation Isolation. * @param timeout Timeout. * @param storeEnabled Store enabled flag. - * @param sql Whether this transaction was started via SQL API or not, or {@code null} if unknown. + * @param mvccOp Whether this transaction was started via SQL API or not, or {@code null} if unknown. * @param txSize Transaction size. * @param subjId Subject ID. * @param taskNameHash Task name hash code. @@ -230,7 +231,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou TransactionIsolation isolation, long timeout, boolean storeEnabled, - Boolean sql, + Boolean mvccOp, int txSize, @Nullable UUID subjId, int taskNameHash, @@ -257,7 +258,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou mappings = implicitSingle ? new IgniteTxMappingsSingleImpl() : new IgniteTxMappingsImpl(); - this.sql = sql; + this.mvccOp = mvccOp; initResult(); @@ -574,6 +575,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou ) { assert key != null; + if (cacheCtx.mvccEnabled()) + return mvccPutAllAsync0(cacheCtx, Collections.singletonMap(key, val), + entryProcessor == null ? null : Collections.singletonMap(key, entryProcessor), invokeArgs, retval, filter); + try { beforePut(cacheCtx, retval, false); @@ -628,7 +633,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou if (log.isDebugEnabled()) log.debug("Before acquiring transaction lock for put on key: " + enlisted); - IgniteInternalFuture<Boolean>fut = cacheCtx.cache().txLockAsync(enlisted, + IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted, timeout, this, /*read*/entryProcessor != null, // Needed to force load from store. @@ -696,6 +701,142 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } /** + * Validate Tx mode. + * + * @param ctx Cache context. + * @throws IgniteCheckedException If tx mode is not supported. + */ + protected void validateTxMode(GridCacheContext ctx) throws IgniteCheckedException { + if(!ctx.mvccEnabled() || pessimistic() && repeatableRead()) + return; + + throw new IgniteCheckedException("Only pessimistic repeatable read transactions are supported at the moment."); + } + + /** + * Internal method for put and transform operations in Mvcc mode. + * Note: Only one of {@code map}, {@code transformMap} maps must be non-null. + * + * @param cacheCtx Context. + * @param map Key-value map to store. + * @param invokeMap Invoke map. + * @param invokeArgs Optional arguments for EntryProcessor. + * @param retval Key-transform value map to store. + * @param filter Filter. + * @return Operation future. + */ + private <K, V> IgniteInternalFuture mvccPutAllAsync0( + final GridCacheContext cacheCtx, + @Nullable Map<? extends K, ? extends V> map, + @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap, + @Nullable final Object[] invokeArgs, + final boolean retval, + @Nullable final CacheEntryPredicate filter + ) { + try { + validateTxMode(cacheCtx); + + // TODO: IGNITE-9540: Fix invoke/invokeAll. + if(invokeMap != null) + MvccUtils.verifyMvccOperationSupport(cacheCtx, "invoke/invokeAll"); + + if (mvccSnapshot == null) { + MvccUtils.mvccTracker(cacheCtx, this); + + assert mvccSnapshot != null; + } + + beforePut(cacheCtx, retval, true); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture(e); + } + + // Cached entry may be passed only from entry wrapper. + final Map<?, ?> map0 = map; + final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap; + + if (log.isDebugEnabled()) + log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]"); + + assert map0 != null || invokeMap0 != null; + + if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) { + if (implicit()) + try { + commit(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + + return new GridFinishedFuture<>(new GridCacheReturn(true, false)); + } + + try { + // Set transform flag for transaction. + if (invokeMap != null) + transform = true; + + Set<?> keys = map0 != null ? map0.keySet() : invokeMap0.keySet(); + + final Map<KeyCacheObject, CacheObject> enlisted = new HashMap<>(keys.size()); + + for (Object key : keys) { + if (isRollbackOnly()) + return new GridFinishedFuture<>(timedOut() ? timeoutException() : rollbackException()); + + if (key == null) { + rollback(); + + throw new NullPointerException("Null key."); + } + + Object val = map0 == null ? null : map0.get(key); + EntryProcessor entryProcessor = transform ? invokeMap.get(key) : null; + + if (val == null && entryProcessor == null) { + setRollbackOnly(); + + throw new NullPointerException("Null value."); + } + + KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key); + CacheObject cacheVal = cacheCtx.toCacheObject(val); + + enlisted.put(cacheKey, cacheVal); + } + + return updateAsync(cacheCtx, new UpdateSourceIterator<IgniteBiTuple<KeyCacheObject, CacheObject>>() { + + private Iterator<Map.Entry<KeyCacheObject, CacheObject>> it = enlisted.entrySet().iterator(); + + @Override public EnlistOperation operation() { + return EnlistOperation.UPSERT; + } + + @Override public boolean hasNextX() throws IgniteCheckedException { + return it.hasNext(); + } + + @Override public IgniteBiTuple<KeyCacheObject, CacheObject> nextX() throws IgniteCheckedException { + Map.Entry<KeyCacheObject, CacheObject> next = it.next(); + + return new IgniteBiTuple<>(next.getKey(), next.getValue()); + } + }, retval, filter, remainingTime(), true); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture(e); + } + catch (RuntimeException e) { + onException(); + + throw e; + } + } + + /** * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap} * maps must be non-null. * @@ -717,6 +858,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou @Nullable Map<KeyCacheObject, GridCacheDrInfo> drMap, final boolean retval ) { + if (cacheCtx.mvccEnabled()) + return mvccPutAllAsync0(cacheCtx, map, invokeMap, invokeArgs, retval, null); + try { beforePut(cacheCtx, retval, false); } @@ -1549,6 +1693,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou final boolean retval, @Nullable final CacheEntryPredicate filter, boolean singleRmv) { + if(cacheCtx.mvccEnabled()) + return mvccRemoveAllAsync0(cacheCtx, keys, retval, filter); + try { checkUpdatesAllowed(cacheCtx); } @@ -1558,9 +1705,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou cacheCtx.checkSecurity(SecurityPermission.CACHE_REMOVE); - if (cacheCtx.mvccEnabled() && !isOperationAllowed(false)) - return txTypeMismatchFinishFuture(); - if (retval) needReturnValue(true); @@ -1690,9 +1834,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou -1L); PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) { + /** {@inheritDoc} */ @Override protected GridCacheReturn postLock(GridCacheReturn ret) - throws IgniteCheckedException - { + throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Acquired transaction lock for remove on keys: " + enlisted); @@ -1769,6 +1913,93 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } /** + * Internal method for remove operations in Mvcc mode. + * + * @param cacheCtx Cache context. + * @param keys Keys to remove. + * @param retval Flag indicating whether a value should be returned. + * @param filter Filter. + * @return Future for asynchronous remove. + */ + @SuppressWarnings("unchecked") + private <K, V> IgniteInternalFuture<GridCacheReturn> mvccRemoveAllAsync0( + final GridCacheContext cacheCtx, + @Nullable final Collection<? extends K> keys, + final boolean retval, + @Nullable final CacheEntryPredicate filter + ) { + try { + validateTxMode(cacheCtx); + + if (mvccSnapshot == null) { + MvccUtils.mvccTracker(cacheCtx, this); + + assert mvccSnapshot != null; + } + + beforeRemove(cacheCtx, retval, true); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture(e); + } + + if (F.isEmpty(keys)) { + if (implicit()) { + try { + commit(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + } + + return new GridFinishedFuture<>(new GridCacheReturn(localResult(), true)); + } + + init(); + + Set<KeyCacheObject> enlisted = new HashSet<>(keys.size()); + + try { + for (Object key : keys) { + if (isRollbackOnly()) + return new GridFinishedFuture<>(timedOut() ? timeoutException() : rollbackException()); + + if (key == null) { + rollback(); + + throw new NullPointerException("Null key."); + } + + KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key); + + enlisted.add(cacheKey); + } + + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture(e); + } + + return updateAsync(cacheCtx, new UpdateSourceIterator<KeyCacheObject>() { + + private Iterator<KeyCacheObject> it = enlisted.iterator(); + + @Override public EnlistOperation operation() { + return EnlistOperation.DELETE; + } + + @Override public boolean hasNextX() throws IgniteCheckedException { + return it.hasNext(); + } + + @Override public KeyCacheObject nextX() throws IgniteCheckedException { + return it.next(); + } + }, retval, filter, remainingTime(), true); + } + + /** * @param cctx Cache context. * @return Mvcc snapshot for read inside tx (initialized once for OPTIMISTIC SERIALIZABLE and REPEATABLE_READ txs). */ @@ -1846,10 +2077,67 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } /** + * Executes key-value update operation in Mvcc mode. + * + * @param cacheCtx Cache context. + * @param it Entries iterator. + * @param retval Return value flag. + * @param filter Filter. + * @param timeout Timeout. + * @param sequential Sequential locking flag. + * @return Operation future. + */ + private IgniteInternalFuture<GridCacheReturn> updateAsync(GridCacheContext cacheCtx, + UpdateSourceIterator<?> it, + boolean retval, + @Nullable CacheEntryPredicate filter, + long timeout, + boolean sequential) { + try { + final CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); + + final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); + + /* TODO: IGNITE-9688: 'sequential' is always true here which can slowdown bulk operations, + but possibly we can safely optimize this. */ + + GridNearTxEnlistFuture fut = new GridNearTxEnlistFuture(cacheCtx, this, + timeout, it, 0, sequential, filter, retval); + + fut.init(); + + return nonInterruptable(new GridEmbeddedFuture<>(fut.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Boolean>() { + @Override public Boolean applyx(IgniteInternalFuture<GridCacheReturn> fut0) throws IgniteCheckedException { + fut0.get(); + + return true; + } + }), new PLC1<GridCacheReturn>(null) { + @Override protected GridCacheReturn postLock(GridCacheReturn ret) throws IgniteCheckedException { + GridCacheReturn futRes = fut.get(); + + assert futRes != null; + + mvccSnapshot.incrementOperationCounter(); + + return new GridCacheReturn(cacheCtx, true, keepBinary, futRes.value(), futRes.success()); + } + })); + } + catch (RuntimeException e) { + onException(); + + throw e; + } + } + + /** + * Executes update query operation in Mvcc mode. + * * @param fut Enlist future. * @return Operation future. */ - public IgniteInternalFuture<Long> updateAsync(GridNearTxAbstractEnlistFuture fut) { + public IgniteInternalFuture<Long> updateAsync(GridNearTxQueryAbstractEnlistFuture fut) { fut.init(); return nonInterruptable(new GridEmbeddedFuture<>(fut.chain(new CX1<IgniteInternalFuture<Long>, Boolean>() { @@ -1900,36 +2188,18 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); - if (cacheCtx.mvccEnabled() && !isOperationAllowed(false)) + try { + validateTxMode(cacheCtx); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture(e); + } + + if (cacheCtx.mvccEnabled() && !isOperationAllowed(true)) return txTypeMismatchFinishFuture(); init(); - if (cacheCtx.mvccEnabled() && (optimistic() && !readCommitted()) && mvccTracker == null) { - // TODO IGNITE-7388: support async tx rollback (e.g. on timeout). - boolean canRemap = cctx.lockedTopologyVersion(null) == null; - - mvccTracker = new MvccQueryTrackerImpl(cacheCtx, canRemap); - - return new GridEmbeddedFuture<>(mvccTracker.requestSnapshot(topologyVersion()), - new IgniteBiClosure<MvccSnapshot, Exception, IgniteInternalFuture<Map<K, V>>>() { - @Override public IgniteInternalFuture<Map<K, V>> apply(MvccSnapshot snapshot, Exception e) { - if (e != null) - return new GridFinishedFuture<>(e); - - return getAllAsync(cacheCtx, - entryTopVer, - keys, - deserializeBinary, - skipVals, - keepCacheObjects, - skipStore, - recovery, - needVer); - } - }); - } - int keysCnt = keys.size(); boolean single = keysCnt == 1; @@ -2234,8 +2504,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou * @param keepCacheObjects Keep cache objects flag. * @param skipStore Skip store flag. * @param recovery Recovery flag.. - * @throws IgniteCheckedException If failed. * @return Enlisted keys. + * @throws IgniteCheckedException If failed. */ @SuppressWarnings({"RedundantTypeArguments"}) private <K, V> Collection<KeyCacheObject> enlistRead( @@ -2568,8 +2838,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c = new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { @Override public void apply(KeyCacheObject key, - @Nullable Object val, - @Nullable GridCacheVersion loadVer) { + @Nullable Object val, + @Nullable GridCacheVersion loadVer) { if (log.isDebugEnabled()) log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']'); @@ -2729,7 +2999,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } /** - * @param cacheCtx Cache context. + * @param cacheCtx Cache context. * @param readThrough Read through flag. * @param async if {@code True}, then loading will happen in a separate thread. * @param keys Keys. @@ -2868,7 +3138,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } /** - * @param cacheCtx Cache context. + * @param cacheCtx Cache context. * @param readThrough Read through flag. * @param async if {@code True}, then loading will happen in a separate thread. * @param keys Keys. @@ -3498,7 +3768,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou if (log.isDebugEnabled()) log.debug("Committing near local tx: " + this); - final NearTxFinishFuture fut, fut0 = finishFut; boolean fastFinish; + final NearTxFinishFuture fut; + final NearTxFinishFuture fut0 = finishFut; + + boolean fastFinish; if (fut0 != null || !FINISH_FUT_UPD.compareAndSet(this, null, fut = finishFuture(fastFinish = fastFinish(), true))) return chainFinishFuture(finishFut, true, true, false); @@ -3577,9 +3850,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou IgniteInternalFuture<?> prepFut = this.prepFut; if (onTimeout && prepFut instanceof GridNearTxPrepareFutureAdapter && !prepFut.isDone()) - ((GridNearTxPrepareFutureAdapter) prepFut).onNearTxLocalTimeout(); + ((GridNearTxPrepareFutureAdapter)prepFut).onNearTxLocalTimeout(); + + final NearTxFinishFuture fut; + final NearTxFinishFuture fut0 = finishFut; - final NearTxFinishFuture fut, fut0 = finishFut; boolean fastFinish; + boolean fastFinish; if (fut0 != null) return chainFinishFuture(finishFut, false, clearThreadMap, onTimeout); @@ -3627,9 +3903,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } /** - * @return Transaction commit future. + * Finish transaction. + * * @param fast {@code True} in case of fast finish. * @param commit {@code True} if commit. + * @return Transaction commit future. */ private NearTxFinishFuture finishFuture(boolean fast, boolean commit) { NearTxFinishFuture fut = fast ? new GridNearTxFastFinishFuture(this, commit) : @@ -3724,7 +4002,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou * @return {@code True} if 'fast finish' path can be used for transaction completion. */ private boolean fastFinish() { - return writeMap().isEmpty() + return writeMap().isEmpty() && !queryEnlisted() && ((optimistic() && !serializable()) || readMap().isEmpty()) && (mappings.single() || F.view(mappings.mappings(), CU.FILTER_QUERY_MAPPING).isEmpty()); } @@ -4174,14 +4452,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou * @return {@code true} if this transaction does not have type flag set or it matches invoking operation, * {@code false} otherwise. */ - public boolean isOperationAllowed(boolean sqlOp) { - if (sql == null) { - sql = sqlOp; + public boolean isOperationAllowed(boolean mvccOp) { + if (this.mvccOp == null) { + this.mvccOp = mvccOp; return true; } - return sql == sqlOp; + return this.mvccOp == mvccOp; } /** @@ -4385,17 +4663,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** * @param cacheCtx Cache context. * @param retval Return value flag. - * @param sql SQL operation flag. + * @param mvccOp SQL operation flag. * @throws IgniteCheckedException If failed. */ - private void beforePut(GridCacheContext cacheCtx, boolean retval, boolean sql) throws IgniteCheckedException { - assert !sql || cacheCtx.mvccEnabled(); + private void beforePut(GridCacheContext cacheCtx, boolean retval, boolean mvccOp) throws IgniteCheckedException { + assert !mvccOp || cacheCtx.mvccEnabled(); checkUpdatesAllowed(cacheCtx); cacheCtx.checkSecurity(SecurityPermission.CACHE_PUT); - if (cacheCtx.mvccEnabled() && !isOperationAllowed(sql)) + if (cacheCtx.mvccEnabled() && !isOperationAllowed(mvccOp)) throw new IgniteCheckedException(TX_TYPE_MISMATCH_ERR_MSG); if (retval) @@ -4408,6 +4686,28 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** * @param cacheCtx Cache context. + * @param retval Return value flag. + * @param mvccOp SQL operation flag. + * @throws IgniteCheckedException If failed. + */ + private void beforeRemove(GridCacheContext cacheCtx, boolean retval, boolean mvccOp) throws IgniteCheckedException { + assert !mvccOp || cacheCtx.mvccEnabled(); + + checkUpdatesAllowed(cacheCtx); + + cacheCtx.checkSecurity(SecurityPermission.CACHE_REMOVE); + + if (cacheCtx.mvccEnabled() && !isOperationAllowed(mvccOp)) + throw new IgniteCheckedException(TX_TYPE_MISMATCH_ERR_MSG); + + if (retval) + needReturnValue(true); + + checkValid(); + } + + /** + * @param cacheCtx Cache context. * @throws IgniteCheckedException If updates are not allowed. */ private void checkUpdatesAllowed(GridCacheContext cacheCtx) throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryAbstractEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryAbstractEnlistFuture.java new file mode 100644 index 0000000..714c62d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryAbstractEnlistFuture.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.util.typedef.internal.CU; + +/** + * + */ +public abstract class GridNearTxQueryAbstractEnlistFuture extends GridNearTxAbstractEnlistFuture<Long> { + /** + * @param cctx Cache context. + * @param tx Transaction. + * @param timeout Timeout. + */ + public GridNearTxQueryAbstractEnlistFuture( + GridCacheContext<?, ?> cctx, GridNearTxLocal tx, long timeout) { + super(cctx, tx, timeout, CU.longReducer()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java index 9a2dfa3..6d48b97 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java @@ -43,9 +43,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.NearTx * Cache lock future. */ @SuppressWarnings("ForLoopReplaceableByForEach") -public class GridNearTxQueryEnlistFuture extends GridNearTxAbstractEnlistFuture { - /** */ - private static final long serialVersionUID = -2155104765461405820L; +public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFuture { /** Involved cache ids. */ private final int[] cacheIds; http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java index dae1e81..d628de1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java @@ -38,7 +38,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; /** - * + * A response to {@link GridNearTxQueryEnlistRequest}. */ public class GridNearTxQueryEnlistResponse extends GridCacheIdMessage implements ExceptionAware { /** */ @@ -99,6 +99,7 @@ public class GridNearTxQueryEnlistResponse extends GridCacheIdMessage implements * @param lockVer Lock version. * @param res Result. * @param removeMapping Remove mapping flag. + * @param newDhtNodes New DHT nodes involved into transaction. */ public GridNearTxQueryEnlistResponse(int cacheId, IgniteUuid futId, int miniId, GridCacheVersion lockVer, long res, boolean removeMapping, Set<UUID> newDhtNodes) { http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java index 2452b92..b83339b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java @@ -62,10 +62,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA * A future tracking requests for remote nodes transaction enlisting and locking * of entries produced with complex DML queries requiring reduce step. */ -public class GridNearTxQueryResultsEnlistFuture extends GridNearTxAbstractEnlistFuture { - /** */ - private static final long serialVersionUID = 4339957209840477447L; - +public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractEnlistFuture { /** */ public static final int DFLT_BATCH_SIZE = 1024; http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java index 94cacfa..48c63bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java @@ -54,7 +54,7 @@ public class GridNearTxQueryResultsEnlistResponse extends GridNearTxQueryEnlistR * @param res Result. * @param dhtFutId Dht future id. * @param dhtVer Dht version. - * @param newDhtNodes New + * @param newDhtNodes New DHT nodes involved into transaction. */ public GridNearTxQueryResultsEnlistResponse(int cacheId, IgniteUuid futId, http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java index ff1c85f..ca77bf9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java @@ -1233,7 +1233,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce for (TxKey key : waitMap.keySet()) { assert key.major() == snapshot.coordinatorVersion() && key.minor() > snapshot.cleanupVersion() - || key.major() > snapshot.coordinatorVersion(); + || key.major() > snapshot.coordinatorVersion() : + "key=" + key + ", snapshot=" + snapshot; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java index f46d1e0..9a767ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.NotNull; @@ -46,7 +47,6 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker { private final IgniteLogger log; /** */ - @GridToStringExclude private long crdVer; /** */ @@ -259,6 +259,9 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker { IgniteInternalFuture<AffinityTopologyVersion> waitFut = cctx.shared().exchange().affinityReadyFuture(topVer.nextMinorVersion()); + if(log.isDebugEnabled()) + log.debug("Remap on new topology: " + waitFut); + if (waitFut == null) requestSnapshot(cctx.shared().exchange().readyAffinityVersion(), lsnr); else { @@ -325,6 +328,11 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker { return true; } + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccQueryTrackerImpl.class, this); + } + /** */ private final class ListenerDecorator implements MvccSnapshotResponseListener { /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index c57a790..16c30c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSna import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -1795,13 +1796,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple GridCacheVersion ver, long expireTime, MvccSnapshot mvccVer, + CacheEntryPredicate filter, boolean primary, boolean needHistory, - boolean noCreate) throws IgniteCheckedException { + boolean noCreate, + boolean retVal) throws IgniteCheckedException { CacheDataStore delegate = init0(false); - return delegate.mvccUpdate( - cctx, key, val, ver, expireTime, mvccVer, primary, needHistory, noCreate); + return delegate.mvccUpdate(cctx, key, val, ver, expireTime, mvccVer, filter, primary, + needHistory, noCreate, retVal); } /** {@inheritDoc} */ @@ -1809,11 +1812,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple GridCacheContext cctx, KeyCacheObject key, MvccSnapshot mvccVer, + CacheEntryPredicate filter, boolean primary, - boolean needHistory) throws IgniteCheckedException { + boolean needHistory, + boolean retVal) throws IgniteCheckedException { CacheDataStore delegate = init0(false); - return delegate.mvccRemove(cctx, key, mvccVer, primary, needHistory); + return delegate.mvccRemove(cctx, key, mvccVer, filter, primary, needHistory, retVal); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index fb6293c..d0e3dca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1720,10 +1720,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig */ public void markQueryEnlisted(MvccSnapshot ver) { if (!qryEnlisted) { + assert ver != null || mvccSnapshot != null; + if (mvccSnapshot == null) mvccSnapshot = ver; - cctx.coordinators().registerLocalTransaction(ver.coordinatorVersion(), ver.counter()); + if(dht()) + cctx.coordinators().registerLocalTransaction(mvccSnapshot.coordinatorVersion(), mvccSnapshot.counter()); qryEnlisted = true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 7cc3e55..438c8ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -466,7 +466,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param concurrency Concurrency. * @param isolation Isolation. * @param timeout transaction timeout. - * @param sql Whether this transaction is being started via SQL API or not, or {@code null} if unknown. + * @param mvccOp Whether this transaction is being started via SQL API or not, or {@code null} if unknown. * @param txSize Expected transaction size. * @param lb Label. * @return New transaction. @@ -479,7 +479,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { TransactionIsolation isolation, long timeout, boolean storeEnabled, - Boolean sql, + Boolean mvccOp, int txSize, @Nullable String lb ) { @@ -499,7 +499,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { isolation, timeout, storeEnabled, - sql, + mvccOp, txSize, subjId, taskNameHash, http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java index 716094e..2a0b582 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java @@ -21,9 +21,12 @@ import java.util.ArrayList; import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion; @@ -38,6 +41,8 @@ import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwar import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CRD_COUNTER_NA; @@ -49,6 +54,7 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.isActiv import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.isVisible; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccVersionIsValid; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.unexpectedStateException; +import static org.apache.ignite.internal.processors.cache.tree.mvcc.data.ResultType.FILTERED; /** * @@ -94,6 +100,9 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult, /** Whether tx has overridden it's own update. */ private static final int OWN_VALUE_OVERRIDDEN = DELETED << 1; + /** Force read full entry instead of header only. */ + private static final int NEED_PREV_VALUE = OWN_VALUE_OVERRIDDEN << 1; + /** */ @GridToStringExclude private final GridCacheContext cctx; @@ -125,6 +134,10 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult, /** */ private List<MvccLinkAwareSearchRow> historyRows; + /** */ + @GridToStringExclude + private CacheEntryPredicate filter; + /** * @param cctx Cache context. * @param key Key. @@ -148,10 +161,12 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult, long expireTime, MvccSnapshot mvccSnapshot, MvccVersion newVer, + @Nullable CacheEntryPredicate filter, boolean primary, boolean lockOnly, boolean needHistory, - boolean fastUpdate) { + boolean fastUpdate, + boolean needPrevValue) { super(key, val, ver, @@ -163,6 +178,7 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult, this.mvccSnapshot = mvccSnapshot; this.cctx = cctx; + this.filter = filter; this.keyAbsentBefore = primary; // True for primary and false for backup (backups do not use this flag). assert !lockOnly || val == null; @@ -181,6 +197,9 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult, if (fastUpdate) flags |= FAST_UPDATE; + if(needPrevValue) + flags |= NEED_PREV_VALUE; + setFlags(flags); } @@ -237,8 +256,18 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult, if (removed) setFlags(DELETED); - else - oldRow = row; + else { + // Actually, full row can be omitted for replace(k,newval) and putIfAbsent, but + // operation context is not available here and full row required if filter is set. + if (res == ResultType.PREV_NOT_NULL && (isFlagsSet(NEED_PREV_VALUE) || filter != null)) + oldRow = tree.getRow(io, pageAddr, idx, RowData.FULL); + else + oldRow = row; + } + + // TODO: IGNITE-9689: optimize filter usage here. See {@link org.apache.ignite.internal.processors.cache.CacheOperationFilter}. + if(filter != null && !applyFilter(res == ResultType.PREV_NOT_NULL ? oldRow.value() : null)) + res = FILTERED; setFlags(LAST_COMMITTED_FOUND | OWN_VALUE_OVERRIDDEN); @@ -293,9 +322,14 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult, else { res = ResultType.PREV_NOT_NULL; - oldRow = row; - keyAbsentBefore = false; + + // Actually, full row can be omitted for replace(k,newval) and putIfAbsent, but + // operation context is not available here and full row required if filter is set. + if( (isFlagsSet(NEED_PREV_VALUE) || filter != null)) + oldRow = tree.getRow(io, pageAddr, idx, RowData.FULL); + else + oldRow = row; } if (isFlagsSet(CHECK_VERSION)) { @@ -337,9 +371,13 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult, } } + // TODO: IGNITE-9689: optimize filter usage here. See {@link org.apache.ignite.internal.processors.cache.CacheOperationFilter}. + if(filter != null && !applyFilter(res == ResultType.PREV_NOT_NULL ? oldRow.value() : null)) + res = FILTERED; + // Lock entry for primary partition if needed. // If invisible row is found for FAST_UPDATE case we should not lock row. - if (isFlagsSet(PRIMARY | REMOVE_OR_LOCK) && !isFlagsSet(FAST_MISMATCH)) { + if (!isFlagsSet(DELETED) && isFlagsSet(PRIMARY | REMOVE_OR_LOCK) && !isFlagsSet(FAST_MISMATCH)) { rowIo.setMvccLockCoordinatorVersion(pageAddr, idx, mvccCrd); rowIo.setMvccLockCounter(pageAddr, idx, mvccCntr); @@ -423,6 +461,22 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult, return unsetFlags(FIRST); } + /** + * Apply filter. + * + * @param val0 Previous value. + * @return Filter result. + */ + private boolean applyFilter(final CacheObject val0) { + GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key) { + @Nullable @Override public CacheObject peekVisibleValue() { + return val0; + } + }; + + return filter.apply(e); + } + /** {@inheritDoc} */ @Override public int state() { return state; @@ -436,10 +490,26 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult, } /** - * @return {@code True} if previous value was non-null. + * @return Result type. */ - @Override public ResultType resultType() { - return res == null ? ResultType.PREV_NULL : res; + @NotNull @Override public ResultType resultType() { + return res == null ? defaultResult() : res; + } + + /** + * Evaluate default result type. + * + * @return Result type. + */ + @NotNull private ResultType defaultResult() { + assert res == null; + + if (filter != null && !applyFilter(null)) + res = FILTERED; + else + res = ResultType.PREV_NULL; // Default. + + return res; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java index eecb4a5..16e7e1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java @@ -30,5 +30,7 @@ public enum ResultType { /** */ LOCKED, /** */ - VERSION_MISMATCH + VERSION_MISMATCH, + /** */ + FILTERED } http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 9bbf03d..1a3c8d7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -483,8 +483,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr /** {@inheritDoc} */ @Override public GridCacheUpdateTxResult mvccSet(@Nullable IgniteInternalTx tx, UUID affNodeId, CacheObject val, long ttl0, AffinityTopologyVersion topVer, MvccSnapshot mvccVer, - GridCacheOperation op, boolean needHistory, - boolean noCreate) throws IgniteCheckedException, GridCacheEntryRemovedException { + GridCacheOperation op, boolean needHistory, boolean noCreate, CacheEntryPredicate filter, boolean retVal) + throws IgniteCheckedException, GridCacheEntryRemovedException { rawPut(val, ttl); return new GridCacheUpdateTxResult(true); @@ -492,7 +492,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr /** {@inheritDoc} */ @Override public GridCacheUpdateTxResult mvccRemove(@Nullable IgniteInternalTx tx, UUID affNodeId, - AffinityTopologyVersion topVer, MvccSnapshot mvccVer, boolean needHistory) + AffinityTopologyVersion topVer, MvccSnapshot mvccVer, boolean needHistory, + CacheEntryPredicate filter, boolean retVal) throws IgniteCheckedException, GridCacheEntryRemovedException { obsoleteVer = ver; http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java index dfc8b05..6a00ea4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java @@ -27,6 +27,9 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; @@ -154,6 +157,13 @@ public class IgniteCacheTxIteratorSelfTest extends GridCommonAbstractTest { for (TransactionIsolation iso : TransactionIsolation.values()) { for (TransactionConcurrency con : TransactionConcurrency.values()) { try (Transaction transaction = ignite.transactions().txStart(con, iso)) { + //TODO: IGNITE-7187: Fix when ticket will be implemented. (Near cache) + //TODO: IGNITE-7956: Fix when ticket will be implemented. (Eviction) + if (((IgniteCacheProxy)cache).context().mvccEnabled() && + ((iso != TransactionIsolation.REPEATABLE_READ && con != TransactionConcurrency.PESSIMISTIC) + || nearEnabled || useEvicPlc)) + return; // Nothing to do. Mode is not supported. + assertEquals(val, cache.get(key)); transaction.commit(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java index b2cbe05..c1718b5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java @@ -18,11 +18,11 @@ package org.apache.ignite.internal.processors.cache.mvcc; import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; @@ -52,10 +52,8 @@ import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; -import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; /** * Base class for Mvcc coordinator failover test. @@ -73,6 +71,8 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach ReadMode readMode, WriteMode writeMode ) throws Exception { + assert concurrency == PESSIMISTIC && isolation == REPEATABLE_READ; + testSpi = true; startGrids(3); @@ -169,7 +169,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach final int KEYS = 100; - final Map<Integer, Integer> vals = new HashMap<>(); + final Map<Integer, Integer> vals = new LinkedHashMap<>(); for (int i = 0; i < KEYS; i++) vals.put(i, 0); @@ -298,7 +298,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach Integer val = 1; while (!done.get()) { - Map<Integer, Integer> vals = new HashMap<>(); + Map<Integer, Integer> vals = new LinkedHashMap<>(); for (int i = 0; i < KEYS; i++) vals.put(i, val); @@ -479,9 +479,6 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach ", crdChangeCnt=" + crdChangeCnt + ", readInTx=" + readInTx + ']'); - TransactionConcurrency concurrency = readMode == ReadMode.GET ? OPTIMISTIC : PESSIMISTIC; // TODO IGNITE-7184 - TransactionIsolation isolation = readMode == ReadMode.GET ? SERIALIZABLE : REPEATABLE_READ; // TODO IGNITE-7184 - testSpi = true; client = false; @@ -510,7 +507,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach final IgniteCache cache = getNode.createCache(ccfg); - final Set<Integer> keys = new HashSet<>(); + final Set<Integer> keys = new TreeSet<>(); List<Integer> keys1 = primaryKeys(jcache(COORDS), 10); List<Integer> keys2 = primaryKeys(jcache(COORDS + 1), 10); @@ -518,7 +515,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach keys.addAll(keys1); keys.addAll(keys2); - Map<Integer, Integer> vals = new HashMap(); + Map<Integer, Integer> vals = new LinkedHashMap(); for (Integer key : keys) vals.put(key, -1); @@ -544,7 +541,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach Map<Integer, Integer> res = null; if (readInTx) { - try (Transaction tx = getNode.transactions().txStart(concurrency, isolation)) { + try (Transaction tx = getNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { res = readAllByMode(cache, keys, readMode, INTEGER_CODEC); tx.rollback(); @@ -581,7 +578,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach stopGrid(i); for (int i = 0; i < 10; i++) { - vals = new HashMap(); + vals = new LinkedHashMap(); for (Integer key : keys) vals.put(key, i); @@ -636,7 +633,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach final IgniteCache cache = client.createCache(ccfg); - final Map<Integer, Integer> vals = new HashMap(); + final Map<Integer, Integer> vals = new LinkedHashMap<>(); for (int i = 0; i < 100; i++) vals.put(i, i); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java index 54e4315..60f1a2f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java @@ -113,27 +113,6 @@ public abstract class CacheMvccAbstractCoordinatorFailoverTest extends CacheMvcc /** * @throws Exception If failed. */ - public void testCoordinatorFailureSimpleSerializableTxPutGet() throws Exception { - coordinatorFailureSimple(OPTIMISTIC, SERIALIZABLE, GET, PUT); - } - - /** - * @throws Exception If failed. - */ - public void testCoordinatorFailureSimpleOptimisticTxPutGet() throws Exception { - coordinatorFailureSimple(OPTIMISTIC, REPEATABLE_READ, GET, PUT); - } - - /** - * @throws Exception If failed. - */ - public void testTxInProgressCoordinatorChangeSimple_ReadonlyPutGet() throws Exception { - txInProgressCoordinatorChangeSimple(OPTIMISTIC, SERIALIZABLE, null, GET, PUT); - } - - /** - * @throws Exception If failed. - */ public void testReadInProgressCoordinatorFailsSimple_FromClientPutGet() throws Exception { readInProgressCoordinatorFailsSimple(true, null, GET, PUT); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java index fe450d1..6c6b8df 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java @@ -173,7 +173,7 @@ public abstract class CacheMvccAbstractFeatureTest extends CacheMvccAbstractTest int idx; do { - idx = (int) (Math.random() * 100) + 1; + idx = (int) (Math.random() * 100); } while (!keys.add(idx)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java index a4962d1..c191849 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java @@ -22,10 +22,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; @@ -45,6 +48,7 @@ import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; @@ -92,11 +96,9 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SCAN; import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL; import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL_SUM; import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML; -import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.PUT; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -223,18 +225,21 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { + persistence = false; + try { - verifyOldVersionsCleaned(); + if(disableScheduledVacuum) + verifyOldVersionsCleaned(); verifyCoordinatorInternalState(); } finally { stopAllGrids(); - } - MvccProcessorImpl.coordinatorAssignClosure(null); + MvccProcessorImpl.coordinatorAssignClosure(null); - cleanPersistenceDir(); + cleanPersistenceDir(); + } super.afterTest(); } @@ -420,7 +425,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { Integer id1 = Math.min(i1, i2); Integer id2 = Math.max(i1, i2); - TreeSet<Integer> keys = new TreeSet<>(); + Set<Integer> keys = new HashSet<>(); keys.add(id1); keys.add(id2); @@ -787,7 +792,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { * @param cache Cache. * @return All accounts */ - private static Map<Integer, MvccTestAccount> getAllSql(TestCache<Integer, MvccTestAccount> cache) { + protected static Map<Integer, MvccTestAccount> getAllSql(TestCache<Integer, MvccTestAccount> cache) { Map<Integer, MvccTestAccount> accounts = new HashMap<>(); SqlFieldsQuery qry = new SqlFieldsQuery("select _key, val, updateCnt from MvccTestAccount"); @@ -826,12 +831,28 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { * @param cache Cache. * @param key Key. */ - private static void removeSql(TestCache<Integer, MvccTestAccount> cache, Integer key) { + protected static void removeSql(TestCache<Integer, MvccTestAccount> cache, Integer key) { SqlFieldsQuery qry = new SqlFieldsQuery("delete from MvccTestAccount where _key=" + key); cache.cache.query(qry).getAll(); } + + /** + * Merge account by means of SQL API. + * + * @param cache Cache. + * @param key Key. + * @param val Value. + * @param updateCnt Update counter. + */ + protected static void mergeSql(TestCache<Integer, MvccTestAccount> cache, Integer key, Integer val, Integer updateCnt) { + SqlFieldsQuery qry = new SqlFieldsQuery("merge into MvccTestAccount(_key, val, updateCnt) values " + + " (" + key+ ", " + val + ", " + updateCnt + ")"); + + cache.cache.query(qry).getAll(); + } + /** * Inserts account by means of SQL API. * @@ -867,9 +888,6 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { ReadMode readMode, WriteMode writeMode ) throws Exception { - if(readMode == SCAN && writeMode == PUT) - fail("https://issues.apache.org/jira/browse/IGNITE-7764"); - final int RANGE = 20; final int writers = 4; @@ -886,15 +904,23 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { info("Thread range [min=" + min + ", max=" + max + ']'); - Map<Integer, Integer> map = new HashMap<>(); + // Sorted map for put to avoid deadlocks. + Map<Integer, Integer> map = new TreeMap<>(); + + // Unordered key sequence. + Set<Integer> keys = new LinkedHashSet<>(); int v = idx * 1_000_000; boolean first = true; while (!stop.get()) { - while (map.size() < RANGE) - map.put(rnd.nextInt(min, max), v); + while (keys.size() < RANGE) { + int key = rnd.nextInt(min, max); + + if (keys.add(key)) + map.put(key, v); + } TestCache<Integer, Integer> cache = randomCache(caches, rnd); @@ -903,9 +929,9 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { if (!first && rnd.nextBoolean()) { - Map<Integer, Integer> res = readAllByMode(cache.cache, map.keySet(), readMode, INTEGER_CODEC); + Map<Integer, Integer> res = readAllByMode(cache.cache, keys, readMode, INTEGER_CODEC); - for (Integer k : map.keySet()) + for (Integer k : keys) assertEquals("res=" + res, v - 1, (Object)res.get(k)); } @@ -917,14 +943,12 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { } if (rnd.nextBoolean()) { - Map<Integer, Integer> res = readAllByMode(cache.cache, map.keySet(), readMode, INTEGER_CODEC); + Map<Integer, Integer> res = readAllByMode(cache.cache, keys, readMode, INTEGER_CODEC); - for (Integer k : map.keySet()) + for (Integer k : keys) assertEquals("key=" + k, v, (Object)res.get(k)); } - map.clear(); - v++; } catch (Exception e) { @@ -933,6 +957,8 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { finally { cache.readUnlock(); + keys.clear(); + map.clear(); } } @@ -956,6 +982,8 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { int min = range * RANGE; int max = min + RANGE; + keys.clear(); + while (keys.size() < RANGE) keys.add(rnd.nextInt(min, max)); @@ -1003,8 +1031,6 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { } } } - - keys.clear(); } } }; @@ -1054,9 +1080,6 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { ) throws Exception { - if(readMode == SCAN && writeMode == PUT) - fail("https://issues.apache.org/jira/browse/IGNITE-7764"); - final int TOTAL = 20; assert N <= TOTAL; @@ -1071,7 +1094,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { @Override public void apply(IgniteCache<Object, Object> cache) { final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); - Map<Integer, Integer> vals = new HashMap<>(); + Map<Integer, Integer> vals = new LinkedHashMap<>(); for (int i = 0; i < TOTAL; i++) vals.put(i, N); @@ -1341,6 +1364,9 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { while (System.currentTimeMillis() < stopTime && !stop.get()) { Thread.sleep(1000); + if (System.currentTimeMillis() >= stopTime || stop.get()) + break; + if (restartMode != null) { switch (restartMode) { case RESTART_CRD: { @@ -1806,12 +1832,15 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { } }); + Map res; - Map res = (Map)cache.query(scanQry).getAll() - .stream() - .collect(Collectors.toMap(v -> ((IgniteBiTuple)v).getKey(), v -> ((IgniteBiTuple)v).getValue())); + try (QueryCursor qry = cache.query(scanQry)) { + res = (Map)qry.getAll() + .stream() + .collect(Collectors.toMap(v -> ((IgniteBiTuple)v).getKey(), v -> ((IgniteBiTuple)v).getValue())); - assertTrue("res.size()=" + res.size() + ", keys.size()=" + keys.size(), res.size() <= keys.size()); + assertTrue("res.size()=" + res.size() + ", keys.size()=" + keys.size(), res.size() <= keys.size()); + } return res; @@ -1833,29 +1862,29 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { String qry = b.toString(); - SqlFieldsQuery sqlFieldsQry = new SqlFieldsQuery(qry); + SqlFieldsQuery sqlFieldsQry = new SqlFieldsQuery(qry); if (emulateLongQry) sqlFieldsQry.setLazy(true).setPageSize(1); List<List> rows; - if (emulateLongQry) { - FieldsQueryCursor<List> cur = cache.query(sqlFieldsQry); - - rows = new ArrayList<>(); + try (FieldsQueryCursor<List> cur = cache.query(sqlFieldsQry)) { + if (emulateLongQry) { + rows = new ArrayList<>(); - for (List row : cur) { - rows.add(row); + for (List row : cur) { + rows.add(row); - doSleep(ThreadLocalRandom.current().nextInt(50)); + doSleep(ThreadLocalRandom.current().nextInt(50)); + } } + else + rows = cur.getAll(); } - else - rows = cache.query(sqlFieldsQry).getAll(); if (rows.isEmpty()) - return Collections.EMPTY_MAP; + return Collections.emptyMap(); res = new HashMap(); @@ -1887,7 +1916,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { rows = cur.getAll(); if (rows.isEmpty()) - return Collections.EMPTY_MAP; + return Collections.emptyMap(); res = new HashMap(); @@ -2104,6 +2133,23 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + MvccTestAccount account = (MvccTestAccount)o; + return val == account.val && + updateCnt == account.updateCnt; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + + return Objects.hash(val, updateCnt); + } + + /** {@inheritDoc} */ @Override public String toString() { return "MvccTestAccount{" + "val=" + val +