IGNITE-7764: MVCC: cache API support. This closes #4725.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f7f834bf Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f7f834bf Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f7f834bf Branch: refs/heads/ignite-5797 Commit: f7f834bfaf8c4170ab852e829554c8ab5b373b77 Parents: 6f39115 Author: AMRepo <andrey.mashen...@gmail.com> Authored: Fri Sep 28 15:57:24 2018 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Fri Sep 28 15:57:24 2018 +0300 ---------------------------------------------------------------------- .../ignite/codegen/MessageCodeGenerator.java | 6 +- .../communication/GridIoMessageFactory.java | 14 +- .../processors/cache/GridCacheAdapter.java | 29 +- .../processors/cache/GridCacheEntryEx.java | 12 +- .../processors/cache/GridCacheMapEntry.java | 164 +++-- .../cache/GridCacheUpdateTxResult.java | 23 +- .../cache/IgniteCacheOffheapManager.java | 24 +- .../cache/IgniteCacheOffheapManagerImpl.java | 48 +- .../processors/cache/IgniteCacheProxyImpl.java | 2 +- .../dht/GridDhtTransactionalCacheAdapter.java | 134 +++- .../dht/GridDhtTxAbstractEnlistFuture.java | 72 +- .../distributed/dht/GridDhtTxEnlistFuture.java | 147 ++++ .../dht/GridDhtTxQueryAbstractEnlistFuture.java | 83 +++ .../dht/GridDhtTxQueryEnlistFuture.java | 23 +- .../dht/GridDhtTxQueryResultsEnlistFuture.java | 37 +- .../cache/distributed/dht/GridDhtTxRemote.java | 4 + .../dht/GridPartitionedSingleGetFuture.java | 3 +- .../distributed/dht/NearTxResultHandler.java | 128 ++++ .../dht/colocated/GridDhtColocatedCache.java | 138 ++-- .../GridNearPessimisticTxPrepareFuture.java | 9 +- .../near/GridNearTxAbstractEnlistFuture.java | 20 +- .../near/GridNearTxEnlistFuture.java | 683 +++++++++++++++++++ .../near/GridNearTxEnlistRequest.java | 642 +++++++++++++++++ .../near/GridNearTxEnlistResponse.java | 372 ++++++++++ .../cache/distributed/near/GridNearTxLocal.java | 416 +++++++++-- .../GridNearTxQueryAbstractEnlistFuture.java | 36 + .../near/GridNearTxQueryEnlistFuture.java | 4 +- .../near/GridNearTxQueryEnlistResponse.java | 3 +- .../GridNearTxQueryResultsEnlistFuture.java | 5 +- .../GridNearTxQueryResultsEnlistResponse.java | 2 +- .../cache/mvcc/MvccProcessorImpl.java | 3 +- .../cache/mvcc/MvccQueryTrackerImpl.java | 10 +- .../persistence/GridCacheOffheapManager.java | 15 +- .../transactions/IgniteTxLocalAdapter.java | 5 +- .../cache/transactions/IgniteTxManager.java | 6 +- .../cache/tree/mvcc/data/MvccUpdateDataRow.java | 88 ++- .../cache/tree/mvcc/data/ResultType.java | 4 +- .../processors/cache/GridCacheTestEntryEx.java | 7 +- .../IgniteCacheTxIteratorSelfTest.java | 10 + ...vccAbstractBasicCoordinatorFailoverTest.java | 25 +- ...acheMvccAbstractCoordinatorFailoverTest.java | 21 - .../mvcc/CacheMvccAbstractFeatureTest.java | 2 +- .../cache/mvcc/CacheMvccAbstractTest.java | 132 ++-- .../cache/mvcc/CacheMvccTransactionsTest.java | 596 +++++++--------- .../DataStreamProcessorMvccSelfTest.java | 5 + .../configvariations/ConfigVariations.java | 1 - .../query/h2/DhtResultSetEnlistFuture.java | 4 +- .../query/h2/NearResultSetEnlistFuture.java | 3 - ...sactionsCommandsWithMvccEnabledSelfTest.java | 78 +-- ...cheMvccSelectForUpdateQueryAbstractTest.java | 2 + .../mvcc/CacheMvccSqlQueriesAbstractTest.java | 4 + .../mvcc/CacheMvccSqlTxQueriesAbstractTest.java | 15 +- .../mvcc/MvccRepeatableReadBulkOpsTest.java | 441 ++++++++++++ .../mvcc/MvccRepeatableReadOperationsTest.java | 276 ++++++++ .../testsuites/IgniteCacheMvccSqlTestSuite.java | 6 + .../ApiParity/IgniteConfigurationParityTest.cs | 5 +- 56 files changed, 4168 insertions(+), 879 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index 9512bae..bcb9ef4 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -38,12 +38,14 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.UUID; -import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.GridCodegenConverter; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.IgniteCodeGeneratingFail; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -168,7 +170,7 @@ public class MessageCodeGenerator { // gen.generateAll(true); -// gen.generateAndWrite(GridNearTxQueryResultsEnlistRequest.class); + gen.generateAndWrite(GridNearTxEnlistResponse.class); // gen.generateAndWrite(GridNearAtomicUpdateRequest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 41c75be..389d8c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -110,6 +110,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; @@ -1066,7 +1068,17 @@ public class GridIoMessageFactory implements MessageFactory { break; - // [-3..119] [124..129] [-23..-27] [-36..-55]- this + case 159: + msg = new GridNearTxEnlistRequest(); + + break; + + case 160: + msg = new GridNearTxEnlistResponse(); + + break; + + // [-3..119] [124..129] [-23..-27] [-36..-55]- this // [120..123] - DR // [-4..-22, -30..-35] - SQL // [2048..2053] - Snapshots http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 476b083..cf9337b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -154,7 +154,9 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** * Adapter for different cache implementations. @@ -1940,6 +1942,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } if (tx == null || tx.implicit()) { + assert !ctx.mvccEnabled() || mvccSnapshot != null; + Map<KeyCacheObject, EntryGetResult> misses = null; Set<GridCacheEntryEx> newLocalEntries = null; @@ -1978,7 +1982,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V boolean skipEntry = readNoEntry; if (readNoEntry) { - CacheDataRow row = mvccSnapshot != null ? ctx.offheap().mvccRead(ctx, key, mvccSnapshot) : + CacheDataRow row = mvccSnapshot != null ? + ctx.offheap().mvccRead(ctx, key, mvccSnapshot) : ctx.offheap().read(ctx, key); if (row != null) { @@ -3411,7 +3416,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKeys(keys); - //TODO IGNITE-7764 + //TODO: IGNITE-9324: add explicit locks support. MvccUtils.verifyMvccOperationSupport(ctx, "Lock"); IgniteInternalFuture<Boolean> fut = lockAllAsync(keys, timeout); @@ -3442,7 +3447,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKey(key); - //TODO IGNITE-7764 + //TODO: IGNITE-9324: add explicit locks support. MvccUtils.verifyMvccOperationSupport(ctx, "Lock"); return lockAllAsync(Collections.singletonList(key), timeout); @@ -4213,11 +4218,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V true, op.single(), ctx.systemTx() ? ctx : null, - OPTIMISTIC, - READ_COMMITTED, + ctx.mvccEnabled() ? PESSIMISTIC : OPTIMISTIC, + ctx.mvccEnabled() ? REPEATABLE_READ : READ_COMMITTED, tCfg.getDefaultTxTimeout(), !ctx.skipStore(), - false, + ctx.mvccEnabled(), 0, null ); @@ -4315,11 +4320,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V true, op.single(), ctx.systemTx() ? ctx : null, - OPTIMISTIC, - READ_COMMITTED, + ctx.mvccEnabled() ? PESSIMISTIC : OPTIMISTIC, + ctx.mvccEnabled() ? REPEATABLE_READ : READ_COMMITTED, txCfg.getDefaultTxTimeout(), !skipStore, - false, + ctx.mvccEnabled(), 0, null); @@ -4996,11 +5001,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V true, op.single(), ctx.systemTx() ? ctx : null, - OPTIMISTIC, - READ_COMMITTED, + ctx.mvccEnabled() ? PESSIMISTIC : OPTIMISTIC, + ctx.mvccEnabled() ? REPEATABLE_READ : READ_COMMITTED, CU.transactionConfiguration(ctx, ctx.kernalContext().config()).getDefaultTxTimeout(), opCtx == null || !opCtx.skipStore(), - false, + ctx.mvccEnabled(), 0, null); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 18fa820..2e96a9c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -352,6 +352,8 @@ public interface GridCacheEntryEx { * @param op Cache operation. * @param needHistory Whether to collect rows created or affected by the current tx. * @param noCreate Entry should not be created when enabled, e.g. SQL INSERT. + * @param filter Filter. + * @param retVal Previous value return flag. * @return Tuple containing success flag and old value. If success is {@code false}, * then value is {@code null}. * @throws IgniteCheckedException If storing value failed. @@ -366,7 +368,9 @@ public interface GridCacheEntryEx { MvccSnapshot mvccVer, GridCacheOperation op, boolean needHistory, - boolean noCreate) throws IgniteCheckedException, GridCacheEntryRemovedException; + boolean noCreate, + @Nullable CacheEntryPredicate filter, + boolean retVal) throws IgniteCheckedException, GridCacheEntryRemovedException; /** * @param tx Cache transaction. @@ -374,6 +378,8 @@ public interface GridCacheEntryEx { * @param topVer Topology version. * @param mvccVer Mvcc version. * @param needHistory Whether to collect rows created or affected by the current tx. + * @param filter Filter. + * @param retVal Previous value return flag. * @return Tuple containing success flag and old value. If success is {@code false}, * then value is {@code null}. * @throws IgniteCheckedException If storing value failed. @@ -384,7 +390,9 @@ public interface GridCacheEntryEx { UUID affNodeId, AffinityTopologyVersion topVer, MvccSnapshot mvccVer, - boolean needHistory) throws IgniteCheckedException, GridCacheEntryRemovedException; + boolean needHistory, + @Nullable CacheEntryPredicate filter, + boolean retVal) throws IgniteCheckedException, GridCacheEntryRemovedException; /** * @param tx Transaction adapter. http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 8fe559d..f58a3dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; import org.apache.ignite.internal.processors.cache.transactions.TxCounters; +import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateDataRow; import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult; import org.apache.ignite.internal.processors.cache.tree.mvcc.data.ResultType; import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry; @@ -1046,7 +1047,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme MvccSnapshot mvccVer, GridCacheOperation op, boolean needHistory, - boolean noCreate) throws IgniteCheckedException, GridCacheEntryRemovedException { + boolean noCreate, + CacheEntryPredicate filter, + boolean retVal) throws IgniteCheckedException, GridCacheEntryRemovedException { assert tx != null; final boolean valid = valid(tx.topologyVersion()); @@ -1087,7 +1090,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert val != null; res = cctx.offheap().mvccUpdate( - this, val, newVer, expireTime, mvccVer, tx.local(), needHistory, noCreate); + this, val, newVer, expireTime, mvccVer, tx.local(), needHistory, noCreate, filter, retVal); assert res != null; @@ -1100,7 +1103,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (res.resultType() == ResultType.VERSION_MISMATCH) throw new IgniteSQLException("Mvcc version mismatch.", CONCURRENT_UPDATE); - else if (noCreate && res.resultType() == ResultType.PREV_NULL) + else if (res.resultType() == ResultType.FILTERED || (noCreate && res.resultType() == ResultType.PREV_NULL)) return new GridCacheUpdateTxResult(false); else if (res.resultType() == ResultType.LOCKED) { unlockEntry(); @@ -1112,7 +1115,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer); lockFut.listen(new MvccUpdateLockListener(tx, this, affNodeId, topVer, val, ttl0, mvccVer, - op, needHistory, noCreate, resFut)); + op, needHistory, noCreate, filter, retVal, resFut)); return new GridCacheUpdateTxResult(false, resFut); } @@ -1141,17 +1144,18 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme counters.incrementUpdateCounter(cctx.cacheId(), partition()); } - if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) + if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) { logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry( - cctx.cacheId(), - key, - val, - res.resultType() == ResultType.PREV_NULL ? CREATE : UPDATE, - tx.nearXidVersion(), - newVer, - expireTime, - key.partition(), - 0L))); + cctx.cacheId(), + key, + val, + res.resultType() == ResultType.PREV_NULL ? CREATE : UPDATE, + tx.nearXidVersion(), + newVer, + expireTime, + key.partition(), + 0L))); + } update(val, expireTime, ttl, newVer, true); @@ -1172,6 +1176,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, logPtr) : new GridCacheUpdateTxResult(false, logPtr); + CacheDataRow oldRow = ((MvccUpdateDataRow)res).oldRow(); + + if(retVal && (res.resultType() == ResultType.PREV_NOT_NULL || res.resultType() == ResultType.VERSION_FOUND)) { + assert oldRow != null; + + updRes.prevValue(oldRow.value()); + } + updRes.mvccHistory(res.history()); return updRes; @@ -1183,7 +1195,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme UUID affNodeId, AffinityTopologyVersion topVer, MvccSnapshot mvccVer, - boolean needHistory) throws IgniteCheckedException, GridCacheEntryRemovedException { + boolean needHistory, + @Nullable CacheEntryPredicate filter, + boolean retVal) throws IgniteCheckedException, GridCacheEntryRemovedException { assert tx != null; assert mvccVer != null; @@ -1204,13 +1218,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert newVer != null : "Failed to get write version for tx: " + tx; - res = cctx.offheap().mvccRemove(this, mvccVer, tx.local(), needHistory); + res = cctx.offheap().mvccRemove(this, mvccVer, tx.local(), needHistory, filter, retVal); assert res != null; if (res.resultType() == ResultType.VERSION_MISMATCH) throw new IgniteSQLException("Mvcc version mismatch.", CONCURRENT_UPDATE); - else if (res.resultType() == ResultType.PREV_NULL) + else if (res.resultType() == ResultType.PREV_NULL || res.resultType() == ResultType.FILTERED) return new GridCacheUpdateTxResult(false); else if (res.resultType() == ResultType.LOCKED) { unlockEntry(); @@ -1222,7 +1236,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer); lockFut.listen(new MvccRemoveLockListener(tx, this, affNodeId, topVer, mvccVer, needHistory, - resFut)); + resFut, retVal, filter)); return new GridCacheUpdateTxResult(false, resFut); } @@ -1265,6 +1279,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, logPtr) : new GridCacheUpdateTxResult(false, logPtr); + CacheDataRow oldRow = ((MvccUpdateDataRow)res).oldRow(); + + if(retVal && (res.resultType() == ResultType.PREV_NOT_NULL || res.resultType() == ResultType.VERSION_FOUND)) { + assert oldRow != null; + + updRes.prevValue(oldRow.value()); + } + updRes.mvccHistory(res.history()); return updRes; @@ -2264,12 +2286,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (updateMetrics && updateRes.outcome().updateReadMetrics() && needVal) - cctx.cache().metrics0().onRead(oldVal != null); + cctx.cache().metrics0().onRead(oldVal != null); if (updateMetrics && INVOKE_NO_OP.equals(updateRes.outcome()) && (transformOp || updateRes.transformed())) cctx.cache().metrics0().onReadOnlyInvoke(oldVal != null); else if (updateMetrics && REMOVE_NO_VAL.equals(updateRes.outcome()) - && (transformOp || updateRes.transformed())) + && (transformOp || updateRes.transformed())) cctx.cache().metrics0().onInvokeRemove(oldVal != null); switch (updateRes.outcome()) { @@ -3521,11 +3543,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Detach value before index update. val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); - if (val != null) { - if (cctx.mvccEnabled()) - cctx.offheap().mvccInitialValue(this, val, newVer, expTime); - else - storeValue(val, expTime, newVer); + if (val != null) { + if (cctx.mvccEnabled()) + cctx.offheap().mvccInitialValue(this, val, newVer, expTime); + else + storeValue(val, expTime, newVer); if (deletedUnlocked()) deletedUnlocked(false); @@ -4157,12 +4179,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** - * Stores value inoffheap.* + * Stores value in off-heap. + * * @param val Value. * @param expireTime Expire time. * @param ver New entry version. * @param predicate Optional predicate. - * * @return {@code True} if storage was modified. * @throws IgniteCheckedException If update failed. */ @@ -4299,7 +4321,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (detached()) return rawGet(); - for (;;) { + for (; ; ) { GridCacheEntryEx e = cctx.cache().peekEx(key); if (e == null) @@ -4806,7 +4828,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (transformed) cctx.cache().metrics0().onInvokeRemove(hasOldVal); - } else if (op == READ && transformed) + } + else if (op == READ && transformed) cctx.cache().metrics0().onReadOnlyInvoke(hasOldVal); else { cctx.cache().metrics0().onWrite(); @@ -4940,6 +4963,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** */ private final GridFutureAdapter<GridCacheUpdateTxResult> resFut; + /** Need previous value flag. */ + private final boolean needVal; + + /** Filter. */ + private final CacheEntryPredicate filter; + /** */ private GridCacheMapEntry entry; @@ -4950,7 +4979,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme AffinityTopologyVersion topVer, MvccSnapshot mvccVer, boolean needHistory, - GridFutureAdapter<GridCacheUpdateTxResult> resFut) { + GridFutureAdapter<GridCacheUpdateTxResult> resFut, + boolean retVal, + @Nullable CacheEntryPredicate filter) { this.tx = tx; this.entry = entry; this.topVer = topVer; @@ -4958,6 +4989,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme this.mvccVer = mvccVer; this.needHistory = needHistory; this.resFut = resFut; + this.needVal = retVal; + this.filter = filter; } /** {@inheritDoc} */ @@ -4989,8 +5022,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme cctx.shared().database().checkpointReadLock(); try { - res = cctx.offheap().mvccRemove(entry, mvccVer, tx.local(), needHistory); - } finally { + res = cctx.offheap().mvccRemove(entry, mvccVer, tx.local(), needHistory, filter, needVal); + } + finally { cctx.shared().database().checkpointReadUnlock(); } @@ -5001,7 +5035,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return; } - else if (res.resultType() == ResultType.PREV_NULL) { + else if (res.resultType() == ResultType.PREV_NULL || res.resultType() == ResultType.FILTERED) { resFut.onDone(new GridCacheUpdateTxResult(false)); return; @@ -5034,15 +5068,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry( - cctx.cacheId(), - entry.key(), - null, - DELETE, - tx.nearXidVersion(), - tx.writeVersion(), - 0, - entry.key().partition(), - 0))); + cctx.cacheId(), + entry.key(), + null, + DELETE, + tx.nearXidVersion(), + tx.writeVersion(), + 0, + entry.key().partition(), + 0))); entry.update(null, 0, 0, newVer, true); @@ -5209,6 +5243,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** */ private final boolean noCreate; + /** Filter. */ + private final CacheEntryPredicate filter; + + /** Need previous value flag.*/ + private final boolean needVal; + /** */ MvccUpdateLockListener(IgniteInternalTx tx, GridCacheMapEntry entry, @@ -5220,6 +5260,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridCacheOperation op, boolean needHistory, boolean noCreate, + CacheEntryPredicate filter, + boolean needVal, GridFutureAdapter<GridCacheUpdateTxResult> resFut) { this.tx = tx; this.entry = entry; @@ -5231,6 +5273,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme this.op = op; this.needHistory = needHistory; this.noCreate = noCreate; + this.filter = filter; + this.needVal = needVal; this.resFut = resFut; } @@ -5279,8 +5323,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme try { res = cctx.offheap().mvccUpdate( - entry, val, newVer, expireTime, mvccVer, tx.local(), needHistory, noCreate); - } finally { + entry, val, newVer, expireTime, mvccVer, tx.local(), needHistory, noCreate, filter, needVal); + } + finally { cctx.shared().database().checkpointReadUnlock(); } @@ -5329,15 +5374,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry( - cctx.cacheId(), - entry.key(), - val, - res.resultType() == ResultType.PREV_NULL ? CREATE : UPDATE, - tx.nearXidVersion(), - newVer, - expireTime, - entry.key().partition(), - 0L))); + cctx.cacheId(), + entry.key(), + val, + res.resultType() == ResultType.PREV_NULL ? CREATE : UPDATE, + tx.nearXidVersion(), + newVer, + expireTime, + entry.key().partition(), + 0L))); entry.update(val, expireTime, ttl, newVer, true); @@ -6007,8 +6052,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable IgniteBiTuple<Object, Exception> invokeRes, boolean readFromStore, boolean transformed) - throws IgniteCheckedException - { + throws IgniteCheckedException { GridCacheContext cctx = entry.context(); final CacheObject oldVal = entry.val; @@ -6059,7 +6103,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } else { - newSysTtl = newTtl = conflictCtx.ttl(); + newSysTtl = newTtl = conflictCtx.ttl(); newSysExpireTime = newExpireTime = conflictCtx.expireTime(); } @@ -6166,8 +6210,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable IgniteBiTuple<Object, Exception> invokeRes, boolean readFromStore, boolean transformed) - throws IgniteCheckedException - { + throws IgniteCheckedException { GridCacheContext cctx = entry.context(); CacheObject oldVal = entry.val; @@ -6264,8 +6307,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme private GridCacheVersionConflictContext<?, ?> resolveConflict( CacheObject newVal, @Nullable IgniteBiTuple<Object, Exception> invokeRes) - throws IgniteCheckedException - { + throws IgniteCheckedException { GridCacheContext cctx = entry.context(); // Cache is conflict-enabled. @@ -6437,7 +6479,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } catch (Exception e) { if (e instanceof UnregisteredClassException || e instanceof UnregisteredBinaryTypeException) - throw (IgniteException) e; + throw (IgniteException)e; writeObj = invokeEntry.valObj; http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java index b646cf9..4543dfd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java @@ -40,14 +40,17 @@ public class GridCacheUpdateTxResult { private GridLongList mvccWaitTxs; /** */ - private GridFutureAdapter<GridCacheUpdateTxResult> fut; + private GridFutureAdapter<GridCacheUpdateTxResult> fut; /** */ private WALPointer logPtr; - /** */ + /** Mvcc history. */ private List<MvccLinkAwareSearchRow> mvccHistory; + /** Previous value. */ + private CacheObject prevVal; + /** * Constructor. * @@ -158,6 +161,22 @@ public class GridCacheUpdateTxResult { this.mvccHistory = mvccHistory; } + /** + * + * @return Previous value. + */ + @Nullable public CacheObject prevValue() { + return prevVal; + } + + /** + * + * @param prevVal Previous value. + */ + public void prevValue( @Nullable CacheObject prevVal) { + this.prevVal = prevVal; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheUpdateTxResult.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index b4b6c9b..f576cc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -276,6 +276,8 @@ public interface IgniteCacheOffheapManager { * @param primary {@code True} if on primary node. * @param needHistory Flag to collect history. * @param noCreate Flag indicating that row should not be created if absent. + * @param filter Filter. + * @param retVal Flag to return previous value. * @return Update result. * @throws IgniteCheckedException If failed. */ @@ -287,13 +289,17 @@ public interface IgniteCacheOffheapManager { MvccSnapshot mvccSnapshot, boolean primary, boolean needHistory, - boolean noCreate) throws IgniteCheckedException; + boolean noCreate, + @Nullable CacheEntryPredicate filter, + boolean retVal) throws IgniteCheckedException; /** * @param entry Entry. * @param mvccSnapshot MVCC snapshot. * @param primary {@code True} if on primary node. * @param needHistory Flag to collect history. + * @param filter Filter. + * @param retVal Flag to return previous value. * @return Update result. * @throws IgniteCheckedException If failed. */ @@ -301,7 +307,9 @@ public interface IgniteCacheOffheapManager { GridCacheMapEntry entry, MvccSnapshot mvccSnapshot, boolean primary, - boolean needHistory) throws IgniteCheckedException; + boolean needHistory, + @Nullable CacheEntryPredicate filter, + boolean retVal) throws IgniteCheckedException; /** * @param entry Entry. @@ -788,9 +796,11 @@ public interface IgniteCacheOffheapManager { * @param ver Version. * @param expireTime Expire time. * @param mvccSnapshot MVCC snapshot. + * @param filter Filter. * @param primary {@code True} if update is executed on primary node. * @param needHistory Flag to collect history. * @param noCreate Flag indicating that row should not be created if absent. + * @param retVal Flag to return previous value. * @return Update result. * @throws IgniteCheckedException If failed. */ @@ -801,16 +811,20 @@ public interface IgniteCacheOffheapManager { GridCacheVersion ver, long expireTime, MvccSnapshot mvccSnapshot, + @Nullable CacheEntryPredicate filter, boolean primary, boolean needHistory, - boolean noCreate) throws IgniteCheckedException; + boolean noCreate, + boolean retVal) throws IgniteCheckedException; /** * @param cctx Cache context. * @param key Key. * @param mvccSnapshot MVCC snapshot. + * @param filter Filter. * @param primary {@code True} if update is executed on primary node. * @param needHistory Flag to collect history. + * @param retVal Flag to return previous value. * @return List of transactions to wait for. * @throws IgniteCheckedException If failed. */ @@ -818,8 +832,10 @@ public interface IgniteCacheOffheapManager { GridCacheContext cctx, KeyCacheObject key, MvccSnapshot mvccSnapshot, + @Nullable CacheEntryPredicate filter, boolean primary, - boolean needHistory) throws IgniteCheckedException; + boolean needHistory, + boolean retVal) throws IgniteCheckedException; /** * @param cctx Cache context. http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 11e67d3..e0b9c06 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -514,7 +514,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager MvccSnapshot mvccSnapshot, boolean primary, boolean needHistory, - boolean noCreate) throws IgniteCheckedException { + boolean noCreate, + @Nullable CacheEntryPredicate filter, + boolean retVal) throws IgniteCheckedException { if (entry.detached() || entry.isNear()) return null; @@ -526,9 +528,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager ver, expireTime, mvccSnapshot, + filter, primary, needHistory, - noCreate); + noCreate, + retVal); } /** {@inheritDoc} */ @@ -536,7 +540,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheMapEntry entry, MvccSnapshot mvccSnapshot, boolean primary, - boolean needHistory) throws IgniteCheckedException { + boolean needHistory, + @Nullable CacheEntryPredicate filter, + boolean retVal) throws IgniteCheckedException { if (entry.detached() || entry.isNear()) return null; @@ -545,8 +551,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager return dataStore(entry.localPartition()).mvccRemove(entry.context(), entry.key(), mvccSnapshot, + filter, primary, - needHistory); + needHistory, + retVal); } /** {@inheritDoc} */ @@ -1848,9 +1856,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheVersion ver, long expireTime, MvccSnapshot mvccSnapshot, + @Nullable CacheEntryPredicate filter, boolean primary, boolean needHistory, - boolean noCreate) throws IgniteCheckedException { + boolean noCreate, + boolean retVal) throws IgniteCheckedException { assert mvccSnapshot != null; assert primary || !needHistory; @@ -1866,7 +1876,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager key.valueBytes(coCtx); val.valueBytes(coCtx); - MvccUpdateDataRow updateRow = new MvccUpdateDataRow( + MvccUpdateDataRow updateRow = new MvccUpdateDataRow( cctx, key, val, @@ -1875,11 +1885,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager expireTime, mvccSnapshot, null, + filter, primary, false, needHistory, // we follow fast update visit flow here if row cannot be created by current operation - noCreate); + noCreate, + retVal); assert cctx.shared().database().checkpointLockIsHeldByThread(); @@ -1890,17 +1902,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (res == ResultType.LOCKED // cannot update locked || res == ResultType.VERSION_MISMATCH) // cannot update on write conflict return updateRow; - else if (res == ResultType.VERSION_FOUND) { + else if (res == ResultType.VERSION_FOUND || // exceptional case + res == ResultType.FILTERED || // Operation should be skipped. + (res == ResultType.PREV_NULL && noCreate) // No op. + ) { // Do nothing, except cleaning up not needed versions cleanup(cctx, updateRow.cleanupRows()); return updateRow; } - else if (res == ResultType.PREV_NULL && noCreate) { - cleanup(cctx, updateRow.cleanupRows()); - - return updateRow; - } CacheDataRow oldRow = null; @@ -1961,8 +1971,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager @Override public MvccUpdateResult mvccRemove(GridCacheContext cctx, KeyCacheObject key, MvccSnapshot mvccSnapshot, + @Nullable CacheEntryPredicate filter, boolean primary, - boolean needHistory) throws IgniteCheckedException { + boolean needHistory, + boolean retVal) throws IgniteCheckedException { assert mvccSnapshot != null; assert primary || mvccSnapshot.activeTransactions().size() == 0 : mvccSnapshot; assert primary || !needHistory; @@ -1987,10 +1999,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager 0, mvccSnapshot, null, + filter, primary, false, needHistory, - true); + true, + retVal); assert cctx.shared().database().checkpointLockIsHeldByThread(); @@ -2001,7 +2015,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (res == ResultType.LOCKED // cannot update locked || res == ResultType.VERSION_MISMATCH) // cannot update on write conflict return updateRow; - else if (res == ResultType.VERSION_FOUND) { + else if (res == ResultType.VERSION_FOUND || res == ResultType.FILTERED) { // Do nothing, except cleaning up not needed versions cleanup(cctx, updateRow.cleanupRows()); @@ -2051,9 +2065,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager 0, mvccSnapshot, null, + null, true, true, false, + false, false); assert cctx.shared().database().checkpointLockIsHeldByThread(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java index 225fa81..4989efb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java @@ -348,7 +348,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override public Lock lockAll(final Collection<? extends K> keys) { - //TODO IGNITE-7764 + //TODO: IGNITE-9324: add explicit locks support. MvccUtils.verifyMvccOperationSupport(ctx, "Lock"); return new CacheLockImpl<>(ctx.gate(), delegate, ctx.operationContextPerCall(), keys); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 4480dae..52638c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -60,6 +60,9 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistFuture; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistResponse; @@ -188,7 +191,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxQueryEnlistResponse.class, new CI2<UUID, GridNearTxQueryEnlistResponse>() { @Override public void apply(UUID nodeId, GridNearTxQueryEnlistResponse req) { - processNearEnlistResponse(nodeId, req); + processNearTxQueryEnlistResponse(nodeId, req); } }); @@ -216,7 +219,21 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxQueryResultsEnlistResponse.class, new CI2<UUID, GridNearTxQueryResultsEnlistResponse>() { @Override public void apply(UUID nodeId, GridNearTxQueryResultsEnlistResponse req) { - processNearTxEnlistResponse(nodeId, req); + processNearTxQueryResultsEnlistResponse(nodeId, req); + } + }); + + ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxEnlistRequest.class, + new CI2<UUID, GridNearTxEnlistRequest>() { + @Override public void apply(UUID nodeId, GridNearTxEnlistRequest req) { + processNearTxEnlistRequest(nodeId, req); + } + }); + + ctx.io().addCacheHandler(ctx.cacheId(), GridNearTxEnlistResponse.class, + new CI2<UUID, GridNearTxEnlistResponse>() { + @Override public void apply(UUID nodeId, GridNearTxEnlistResponse msg) { + processNearTxEnlistResponse(nodeId, msg); } }); @@ -756,17 +773,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach /** * @param nodeId Node ID. - * @param res Response. - */ - private void processNearEnlistResponse(UUID nodeId, final GridNearTxQueryEnlistResponse res) { - GridNearTxQueryEnlistFuture fut = (GridNearTxQueryEnlistFuture)ctx.mvcc().versionedFuture(res.version(), res.futureId()); - - if (fut != null) - fut.onResult(nodeId, res); - } - - /** - * @param nodeId Node ID. * @param req Request. */ private void processNearLockRequest(UUID nodeId, GridNearLockRequest req) { @@ -801,7 +807,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach IgniteInternalFuture<?> f; if (req.firstClientRequest()) { - for (;;) { + for (; ; ) { if (waitForExchangeFuture(nearNode, req)) return; @@ -1079,9 +1085,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (top != null && needRemap(req.topologyVersion(), top.readyTopologyVersion())) { if (log.isDebugEnabled()) { log.debug("Client topology version mismatch, need remap lock request [" + - "reqTopVer=" + req.topologyVersion() + - ", locTopVer=" + top.readyTopologyVersion() + - ", req=" + req + ']'); + "reqTopVer=" + req.topologyVersion() + + ", locTopVer=" + top.readyTopologyVersion() + + ", req=" + req + ']'); } GridNearLockResponse res = sendClientLockRemapResponse(nearNode, @@ -1124,7 +1130,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (tx == null || !tx.init()) { String msg = "Failed to acquire lock (transaction has been completed): " + - req.version(); + req.version(); U.warn(log, msg); @@ -1401,7 +1407,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach int i = 0; - for (ListIterator<GridCacheEntryEx> it = entries.listIterator(); it.hasNext();) { + for (ListIterator<GridCacheEntryEx> it = entries.listIterator(); it.hasNext(); ) { GridCacheEntryEx e = it.next(); assert e != null; @@ -1995,6 +2001,71 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } /** + * @param nodeId Node ID. + * @param req Request. + */ + private void processNearTxEnlistRequest(UUID nodeId, final GridNearTxEnlistRequest req) { + assert nodeId != null; + assert req != null; + + ClusterNode nearNode = ctx.discovery().node(nodeId); + + GridDhtTxLocal tx; + + try { + tx = initTxTopologyVersion(nodeId, + nearNode, + req.version(), + req.futureId(), + req.miniId(), + req.firstClientRequest(), + req.topologyVersion(), + req.threadId(), + req.txTimeout(), + req.subjectId(), + req.taskNameHash()); + } + catch (IgniteCheckedException | IgniteException ex) { + GridNearTxEnlistResponse res = new GridNearTxEnlistResponse(req.cacheId(), + req.futureId(), + req.miniId(), + req.version(), + ex); + + try { + ctx.io().send(nearNode, res, ctx.ioPolicy()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send near enlist response [" + + "txId=" + req.version() + + ", node=" + nodeId + + ", res=" + res + ']', e); + } + + return; + } + + GridDhtTxEnlistFuture fut = new GridDhtTxEnlistFuture( + nodeId, + req.version(), + req.mvccSnapshot(), + req.threadId(), + req.futureId(), + req.miniId(), + tx, + req.timeout(), + ctx, + req.rows(), + req.operation(), + req.filter(), + req.needRes()); + + fut.listen(NearTxResultHandler.instance()); + + fut.init(); + } + + /** * @param nodeId Near node id. * @param nearNode Near node. * @param nearLockVer Near lock version. @@ -2125,7 +2196,30 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param nodeId Node ID. * @param res Response. */ - private void processNearTxEnlistResponse(UUID nodeId, final GridNearTxQueryResultsEnlistResponse res) { + private void processNearTxEnlistResponse(UUID nodeId, final GridNearTxEnlistResponse res) { + GridNearTxEnlistFuture fut = (GridNearTxEnlistFuture) + ctx.mvcc().versionedFuture(res.version(), res.futureId()); + + if (fut != null) + fut.onResult(nodeId, res); + } + + /** + * @param nodeId Node ID. + * @param res Response. + */ + private void processNearTxQueryEnlistResponse(UUID nodeId, final GridNearTxQueryEnlistResponse res) { + GridNearTxQueryEnlistFuture fut = (GridNearTxQueryEnlistFuture)ctx.mvcc().versionedFuture(res.version(), res.futureId()); + + if (fut != null) + fut.onResult(nodeId, res); + } + + /** + * @param nodeId Node ID. + * @param res Response. + */ + private void processNearTxQueryResultsEnlistResponse(UUID nodeId, final GridNearTxQueryResultsEnlistResponse res) { GridNearTxQueryResultsEnlistFuture fut = (GridNearTxQueryResultsEnlistFuture) ctx.mvcc().versionedFuture(res.version(), res.futureId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java index ad164e7..64f966d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; @@ -77,11 +78,10 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** - * Abstract future processing transaction enlisting and locking - * of entries produced with DML and SELECT FOR UPDATE queries. + * Abstract future processing transaction enlisting and locking. */ -public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapter<Long> - implements DhtLockFuture<Long> { +public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAdapter<T> + implements DhtLockFuture<T> { /** Done field updater. */ private static final AtomicIntegerFieldUpdater<GridDhtTxAbstractEnlistFuture> DONE_UPD = AtomicIntegerFieldUpdater.newUpdater(GridDhtTxAbstractEnlistFuture.class, "done"); @@ -134,9 +134,6 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt /** */ protected final MvccSnapshot mvccSnapshot; - /** Processed entries count. */ - protected long cnt; - /** New DHT nodes. */ protected Set<UUID> newDhtNodes = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -146,6 +143,9 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt /** Near lock version. */ protected final GridCacheVersion nearLockVer; + /** Filter. */ + private final CacheEntryPredicate filter; + /** Timeout object. */ @GridToStringExclude protected LockTimeoutObject timeoutObj; @@ -202,6 +202,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt * @param tx Transaction. * @param timeout Lock acquisition timeout. * @param cctx Cache context. + * @param filter Filter. */ protected GridDhtTxAbstractEnlistFuture(UUID nearNodeId, GridCacheVersion nearLockVer, @@ -212,7 +213,8 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt @Nullable int[] parts, GridDhtTxLocalAdapter tx, long timeout, - GridCacheContext<?, ?> cctx) { + GridCacheContext<?, ?> cctx, + @Nullable CacheEntryPredicate filter) { assert tx != null; assert timeout >= 0; assert nearNodeId != null; @@ -229,6 +231,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt this.timeout = timeout; this.tx = tx; this.parts = parts; + this.filter = filter; lockVer = tx.xidVersion(); @@ -238,12 +241,38 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt } /** + * Gets source to be updated iterator. + * * @return iterator. * @throws IgniteCheckedException If failed. */ protected abstract UpdateSourceIterator<?> createIterator() throws IgniteCheckedException; /** + * Gets query result. + * + * @return Query result. + */ + protected abstract T result0(); + + /** + * Gets need previous value flag. + * + * @return {@code True} if previous value is required. + */ + public boolean needResult() { + return false; + } + + /** + * Entry processed callback. + * + * @param key Entry key. + * @param res Update result. + */ + protected abstract void onEntryProcessed(KeyCacheObject key, GridCacheUpdateTxResult res); + + /** * */ public void init() { @@ -291,14 +320,14 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt boolean added = cctx.mvcc().addFuture(this, futId); - assert added; - if (isDone()) { cctx.mvcc().removeFuture(futId); return; } + assert added; + if (timeoutObj != null) cctx.time().addTimeoutObject(timeoutObj); @@ -310,12 +339,15 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt if (!it.hasNext()) { U.close(it, log); - onDone(0L); + onDone(result0()); return; } - tx.addActiveCache(cctx, false); + if(!tx.implicitSingle()) + tx.addActiveCache(cctx, false); + else // Nothing to do for single update. + assert tx.txState().cacheIds().contains(cctx.cacheId()) && tx.txState().cacheIds().size() == 1; this.it = it; } @@ -391,7 +423,9 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt cctx.localNodeId(), topVer, mvccSnapshot, - isMoving(key.partition())); + isMoving(key.partition()), + filter, + needResult()); break; @@ -407,7 +441,9 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt mvccSnapshot, op.cacheOperation(), isMoving(key.partition()), - op.noCreate()); + op.noCreate(), + filter, + needResult()); break; @@ -493,7 +529,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt } if (noPendingRequests()) { - onDone(cnt); + onDone(result0()); return; } @@ -569,11 +605,11 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt if (ptr0 != null) walPtr = ptr0; + onEntryProcessed(entry.key(), updRes); + if (!updRes.success()) return; - cnt++; - if (op != EnlistOperation.LOCK) addToBatch(entry.key(), val, updRes.mvccHistory(), entry.context().cacheId()); } @@ -980,7 +1016,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt } /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable Long res, @Nullable Throwable err) { + @Override public boolean onDone(@Nullable T res, @Nullable Throwable err) { assert res != null || err != null; if (!DONE_UPD.compareAndSet(this, 0, 1)) http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java new file mode 100644 index 0000000..58d6b15 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxEnlistFuture.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.Collection; +import java.util.Iterator; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.query.EnlistOperation; +import org.apache.ignite.internal.processors.query.UpdateSourceIterator; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +/** + * Future processing transaction enlisting and locking of entries produces by cache API operations. + */ +public final class GridDhtTxEnlistFuture extends GridDhtTxAbstractEnlistFuture<GridCacheReturn> implements UpdateSourceIterator<Object> { + /** Enlist operation. */ + private EnlistOperation op; + + /** Source iterator. */ + private Iterator<Object> it; + + /** Future result. */ + private GridCacheReturn res; + + /** Need result flag. If {@code True} previous value should be returned as well. */ + private boolean needRes; + + /** + * Constructor. + * + * @param nearNodeId Near node ID. + * @param nearLockVer Near lock version. + * @param mvccSnapshot Mvcc snapshot. + * @param threadId Thread ID. + * @param nearFutId Near future id. + * @param nearMiniId Near mini future id. + * @param tx Transaction. + * @param timeout Lock acquisition timeout. + * @param cctx Cache context. + * @param rows Collection of rows. + * @param op Operation. + * @param filter Filter. + * @param needRes Return previous value flag. + */ + public GridDhtTxEnlistFuture(UUID nearNodeId, + GridCacheVersion nearLockVer, + MvccSnapshot mvccSnapshot, + long threadId, + IgniteUuid nearFutId, + int nearMiniId, + GridDhtTxLocalAdapter tx, + long timeout, + GridCacheContext<?, ?> cctx, + Collection<Object> rows, + EnlistOperation op, + @Nullable CacheEntryPredicate filter, + boolean needRes) { + super(nearNodeId, + nearLockVer, + mvccSnapshot, + threadId, + nearFutId, + nearMiniId, + null, + tx, + timeout, + cctx, + filter); + + this.op = op; + this.needRes = needRes; + + it = rows.iterator(); + + res = new GridCacheReturn(cctx.localNodeId().equals(nearNodeId), false); + + skipNearNodeUpdates = true; + } + + /** {@inheritDoc} */ + @Override protected UpdateSourceIterator<?> createIterator() throws IgniteCheckedException { + return this; + } + + /** {@inheritDoc} */ + @Override @Nullable protected GridCacheReturn result0() { + return res; + } + + /** {@inheritDoc} */ + @Override protected void onEntryProcessed(KeyCacheObject key, GridCacheUpdateTxResult txRes) { + if (needRes && txRes.success()) + res.set(cctx, txRes.prevValue(), txRes.success(), true); + else + res.success(txRes.success()); + } + + /** {@inheritDoc} */ + public boolean needResult() { + return needRes; + } + + /** {@inheritDoc} */ + @Override public EnlistOperation operation() { + return op; + } + + /** {@inheritDoc} */ + public boolean hasNextX() { + return it.hasNext(); + } + + /** {@inheritDoc} */ + public Object nextX() { + return it.next(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtTxEnlistFuture.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java new file mode 100644 index 0000000..0a26d75 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryAbstractEnlistFuture.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.UUID; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +/** + * Abstract future processing transaction enlisting and locking of entries produced with DML and SELECT FOR UPDATE + * queries. + */ +public abstract class GridDhtTxQueryAbstractEnlistFuture extends GridDhtTxAbstractEnlistFuture<Long> { + /** Processed entries count. */ + protected long cnt; + + /** + * Constructor. + * + * @param nearNodeId Near node ID. + * @param nearLockVer Near lock version. + * @param mvccSnapshot Mvcc snapshot. + * @param threadId Thread ID. + * @param nearFutId Near future id. + * @param nearMiniId Near mini future id. + * @param parts Partitions. + * @param tx Transaction. + * @param timeout Lock acquisition timeout. + * @param cctx Cache context. + */ + protected GridDhtTxQueryAbstractEnlistFuture(UUID nearNodeId, + GridCacheVersion nearLockVer, + MvccSnapshot mvccSnapshot, + long threadId, + IgniteUuid nearFutId, + int nearMiniId, + @Nullable int[] parts, + GridDhtTxLocalAdapter tx, + long timeout, + GridCacheContext<?, ?> cctx) { + super(nearNodeId, + nearLockVer, + mvccSnapshot, + threadId, + nearFutId, + nearMiniId, + null, + tx, + timeout, + cctx, null); + } + + /** {@inheritDoc} */ + @Override protected Long result0() { + return cnt; + } + + /** {@inheritDoc} */ + @Override protected void onEntryProcessed(KeyCacheObject key, GridCacheUpdateTxResult res) { + if(res.success()) + cnt++; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java index dd30855..ed792f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.util.Objects; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -29,9 +28,9 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; /** - * Cache lock future. + * Cache query lock future. */ -public final class GridDhtTxQueryEnlistFuture extends GridDhtTxAbstractEnlistFuture { +public final class GridDhtTxQueryEnlistFuture extends GridDhtTxQueryAbstractEnlistFuture { /** Involved cache ids. */ private final int[] cacheIds; @@ -116,24 +115,6 @@ public final class GridDhtTxQueryEnlistFuture extends GridDhtTxAbstractEnlistFut } /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - GridDhtTxQueryEnlistFuture future = (GridDhtTxQueryEnlistFuture)o; - - return Objects.equals(futId, future.futId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return futId.hashCode(); - } - - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtTxQueryEnlistFuture.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java index b3d15d4..c6140fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryResultsEnlistFuture.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.Collection; import java.util.Iterator; -import java.util.Objects; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -34,13 +33,11 @@ import org.apache.ignite.lang.IgniteUuid; * Future processing transaction enlisting and locking of entries * produces by complex DML queries with reduce step. */ -public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxAbstractEnlistFuture implements UpdateSourceIterator<Object> { - /** */ - private static final long serialVersionUID = -4933550335145438798L; - /** */ +public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxQueryAbstractEnlistFuture implements UpdateSourceIterator<Object> { + /** Enlist operation. */ private EnlistOperation op; - /** */ + /** Source iterator. */ private Iterator<Object> it; /** @@ -91,29 +88,6 @@ public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxAbstractEn } /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - GridDhtTxQueryResultsEnlistFuture future = (GridDhtTxQueryResultsEnlistFuture)o; - - return Objects.equals(futId, future.futId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return futId.hashCode(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtTxQueryResultsEnlistFuture.class, this); - } - - /** {@inheritDoc} */ @Override public EnlistOperation operation() { return op; } @@ -127,4 +101,9 @@ public final class GridDhtTxQueryResultsEnlistFuture extends GridDhtTxAbstractEn @Override public Object nextX() { return it.next(); } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtTxQueryResultsEnlistFuture.class, this); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index 6662a1c..9883f6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -440,6 +440,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { ctx.localNodeId(), topologyVersion(), snapshot, + false, + null, false); break; @@ -456,6 +458,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { snapshot, op.cacheOperation(), false, + false, + null, false); break; http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index f5689f9..5d3bef2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -389,7 +389,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec boolean skipEntry = readNoEntry; if (readNoEntry) { - CacheDataRow row = mvccSnapshot != null ? cctx.offheap().mvccRead(cctx, key, mvccSnapshot) : + CacheDataRow row = mvccSnapshot != null ? + cctx.offheap().mvccRead(cctx, key, mvccSnapshot) : cctx.offheap().read(cctx, key); if (row != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java new file mode 100644 index 0000000..0bc00e1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/NearTxResultHandler.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.lang.GridClosureException; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; + +/** + * Response factory. + */ +public final class NearTxResultHandler implements CI1<IgniteInternalFuture<GridCacheReturn>> { + /** */ + private static final long serialVersionUID = 0; + + /** Singleton instance.*/ + private static final NearTxResultHandler INSTANCE = new NearTxResultHandler(); + + /** Constructor. */ + private NearTxResultHandler() { + } + + /** + * @return Handler instance. + */ + public static NearTxResultHandler instance() { + return INSTANCE; + } + + /** + * Response factory method. + * + * @param future Enlist future. + * @return Enlist response. + */ + @SuppressWarnings("unchecked") + public static <T> T createResponse(IgniteInternalFuture<?> future) { + assert future != null; + + Class<?> clazz = future.getClass(); + + if (clazz == GridDhtTxEnlistFuture.class) + return (T)createResponse((GridDhtTxEnlistFuture)future); + else + throw new IllegalStateException(); + } + + /** + * Response factory method. + * + * @param fut Enlist future. + * @return Enlist response. + */ + public static GridNearTxEnlistResponse createResponse(GridDhtTxEnlistFuture fut) { + try { + GridCacheReturn res = fut.get(); + + GridCacheVersion ver = null; + IgniteUuid id = null; + + if (fut.hasNearNodeUpdates) { + ver = fut.cctx.tm().mappedVersion(fut.nearLockVer); + + id = fut.futId; + } + + return new GridNearTxEnlistResponse(fut.cctx.cacheId(), fut.nearFutId, fut.nearMiniId, + fut.nearLockVer, res, ver, id, fut.newDhtNodes); + } + catch (IgniteCheckedException e) { + return new GridNearTxEnlistResponse(fut.cctx.cacheId(), fut.nearFutId, fut.nearMiniId, fut.nearLockVer, e); + } + } + + /** {@inheritDoc} */ + @Override public void apply(IgniteInternalFuture<GridCacheReturn> fut0) { + GridDhtTxAbstractEnlistFuture fut = (GridDhtTxAbstractEnlistFuture)fut0; + + GridCacheContext<?, ?> cctx = fut.cctx; + GridDhtTxLocal tx = (GridDhtTxLocal)fut.tx; + UUID nearNodeId = fut.nearNodeId; + + GridNearTxEnlistResponse res = createResponse(fut); + + try { + cctx.io().send(nearNodeId, res, cctx.ioPolicy()); + } + catch (IgniteCheckedException e) { + U.error(fut.log, "Failed to send near enlist response (will rollback transaction) [" + + "tx=" + CU.txString(tx) + + ", node=" + nearNodeId + + ", res=" + res + ']', e); + + try { + tx.rollbackDhtLocalAsync(); + } + catch (Throwable e1) { + e.addSuppressed(e1); + } + + throw new GridClosureException(e); + } + } +}