IGNITE-3406 - Interceptor and continuous query get correct old value during rebalancing.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/147ab9c0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/147ab9c0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/147ab9c0 Branch: refs/heads/master Commit: 147ab9c08f6ac7edecf656b23d8b25bfab91becf Parents: c24caba Author: dkarachentsev <dkarachent...@gridgain.com> Authored: Mon Sep 19 13:58:41 2016 +0300 Committer: dkarachentsev <dkarachent...@gridgain.com> Committed: Mon Sep 19 13:58:41 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheEntryEx.java | 8 ++ .../processors/cache/GridCacheMapEntry.java | 9 +- .../GridDistributedTxRemoteAdapter.java | 6 ++ .../distributed/dht/GridDhtTxPrepareFuture.java | 36 ++++++- .../cache/transactions/IgniteTxEntry.java | 44 ++++++++- .../transactions/IgniteTxLocalAdapter.java | 8 ++ .../processors/cache/GridCacheTestEntryEx.java | 4 + .../IgniteCacheInterceptorSelfTestSuite.java | 5 + ...ContinuousQueryFailoverAbstractSelfTest.java | 99 ++++++++++++++++++++ 9 files changed, 213 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 616854f..ef6a244 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -358,6 +358,8 @@ public interface GridCacheEntryEx { * @param evt Flag to signal event notification. * @param metrics Flag to signal metrics update. * @param keepBinary Keep binary flag. + * @param oldValPresent {@code True} if oldValue present. + * @param oldVal Old value. * @param topVer Topology version. * @param filter Filter. * @param drType DR type. @@ -383,6 +385,8 @@ public interface GridCacheEntryEx { boolean evt, boolean metrics, boolean keepBinary, + boolean oldValPresent, + @Nullable CacheObject oldVal, AffinityTopologyVersion topVer, CacheEntryPredicate[] filter, GridDrType drType, @@ -402,6 +406,8 @@ public interface GridCacheEntryEx { * @param evt Flag to signal event notification. * @param metrics Flag to signal metrics notification. * @param keepBinary Keep binary flag. + * @param oldValPresent {@code True} if oldValue present. + * @param oldVal Old value. * @param topVer Topology version. * @param filter Filter. * @param drType DR type. @@ -422,6 +428,8 @@ public interface GridCacheEntryEx { boolean evt, boolean metrics, boolean keepBinary, + boolean oldValPresent, + @Nullable CacheObject oldVal, AffinityTopologyVersion topVer, CacheEntryPredicate[] filter, GridDrType drType, http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index c760ac1..a9ac1e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1141,6 +1141,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean evt, boolean metrics, boolean keepBinary, + boolean oldValPresent, + @Nullable CacheObject oldVal, AffinityTopologyVersion topVer, CacheEntryPredicate[] filter, GridDrType drType, @@ -1198,7 +1200,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Map<UUID, CacheContinuousQueryListener> lsnrCol = notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null; - old = (retval || intercept || lsnrCol != null) ? + old = oldValPresent ? oldVal : + (retval || intercept || lsnrCol != null) ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : this.val; if (intercept) { @@ -1333,6 +1336,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean evt, boolean metrics, boolean keepBinary, + boolean oldValPresent, + @Nullable CacheObject oldVal, AffinityTopologyVersion topVer, CacheEntryPredicate[] filter, GridDrType drType, @@ -1403,7 +1408,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Map<UUID, CacheContinuousQueryListener> lsnrCol = notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null; - old = (retval || intercept || lsnrCol != null) ? + old = oldValPresent ? oldVal : (retval || intercept || lsnrCol != null) ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val; if (intercept) { http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index c56d1f7..9d9862a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -542,6 +542,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter true, true, txEntry.keepBinary(), + txEntry.hasOldValue(), + txEntry.oldValue(), topVer, null, replicate ? DR_BACKUP : DR_NONE, @@ -561,6 +563,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter true, true, txEntry.keepBinary(), + txEntry.hasOldValue(), + txEntry.oldValue(), topVer, null, replicate ? DR_BACKUP : DR_NONE, @@ -592,6 +596,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter true, true, txEntry.keepBinary(), + txEntry.hasOldValue(), + txEntry.oldValue(), topVer, null, replicate ? DR_BACKUP : DR_NONE, http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 1bdd9b8..ec73bff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -360,7 +360,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters()); - if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) { + CacheObject val; + CacheObject oldVal = null; + + boolean readOld = hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM; + + if (readOld) { cached.unswap(retVal); boolean readThrough = !txEntry.skipStore() && @@ -375,7 +380,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter final boolean keepBinary = txEntry.keepBinary(); - CacheObject val = cached.innerGet( + val = oldVal = cached.innerGet( null, tx, /*swap*/true, @@ -470,6 +475,33 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter else ret.success(txEntry.op() != DELETE || cached.hasValue()); } + + // Send old value in case if rebalancing is not finished. + final boolean sndOldVal = !cacheCtx.isLocal() && !cacheCtx.topology().rebalanceFinished(tx.topologyVersion()); + + if (sndOldVal) { + if (oldVal == null && !readOld) { + oldVal = cached.innerGet( + null, + tx, + /*swap*/true, + /*readThrough*/false, + /*metrics*/false, + /*event*/false, + /*tmp*/false, + /*subjectId*/tx.subjectId(), + /*transformClo*/null, + /*taskName*/null, + /*expiryPlc*/null, + /*keepBinary*/true); + } + + if (oldVal != null) { + oldVal.prepareMarshal(cacheCtx.cacheObjectContext()); + + txEntry.oldValue(oldVal, true); + } + } } catch (IgniteCheckedException e) { U.error(log, "Failed to get result value for cache entry: " + cached, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 87b2525..194208e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -115,6 +115,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { @GridDirectTransient private TxEntryValueHolder prevVal = new TxEntryValueHolder(); + /** Old value before update. */ + @GridToStringInclude + private TxEntryValueHolder oldVal = new TxEntryValueHolder(); + /** Transform. */ @GridToStringInclude @GridDirectTransient @@ -497,7 +501,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { } /** - * @param oldValOnPrimary {@code True} If old value for 'invoke' operation was non null on primary node. + * @param oldValOnPrimary {@code True} If old value for was non null on primary node. */ public void oldValueOnPrimary(boolean oldValOnPrimary) { setFlag(oldValOnPrimary, OLD_VAL_ON_PRIMARY); @@ -583,6 +587,30 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { } /** + * @return Old value. + */ + @Nullable public CacheObject oldValue() { + return oldVal != null ? oldVal.value() : null; + } + + /** + * @param oldVal Old value. + */ + public void oldValue(CacheObject oldVal, boolean hasOldVal) { + if (this.oldVal == null) + this.oldVal = new TxEntryValueHolder(); + + this.oldVal.value(op(), oldVal, hasOldVal, hasOldVal); + } + + /** + * @return {@code True} if old value present. + */ + public boolean hasOldValue() { + return oldVal != null && oldVal.hasValue(); + } + + /** * @return {@code True} if has value explicitly set. */ public boolean hasValue() { @@ -1069,6 +1097,11 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { writer.incrementState(); + case 13: + if (!writer.writeMessage("oldVal", oldVal)) + return false; + + writer.incrementState(); } return true; @@ -1186,6 +1219,13 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); + case 13: + oldVal = reader.readMessage("oldVal"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); } return reader.afterMessageRead(IgniteTxEntry.class); @@ -1198,7 +1238,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 13; + return 14; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index ee992cc..637f322 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -809,6 +809,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig evt, metrics, txEntry.keepBinary(), + txEntry.hasOldValue(), + txEntry.oldValue(), topVer, null, cached.detached() ? DR_NONE : drType, @@ -834,6 +836,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig false, metrics, txEntry.keepBinary(), + txEntry.hasOldValue(), + txEntry.oldValue(), topVer, CU.empty0(), DR_NONE, @@ -854,6 +858,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig evt, metrics, txEntry.keepBinary(), + txEntry.hasOldValue(), + txEntry.oldValue(), topVer, null, cached.detached() ? DR_NONE : drType, @@ -875,6 +881,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig false, metrics, txEntry.keepBinary(), + txEntry.hasOldValue(), + txEntry.oldValue(), topVer, CU.empty0(), DR_NONE, http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 400fb14..bf543cb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -477,6 +477,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr boolean evt, boolean metrics, boolean keepBinary, + boolean hasOldVal, + @Nullable CacheObject oldVal, AffinityTopologyVersion topVer, CacheEntryPredicate[] filter, GridDrType drType, @@ -556,6 +558,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr boolean evt, boolean metrics, boolean keepBinary, + boolean oldValPresent, + @Nullable CacheObject oldVal, AffinityTopologyVersion topVer, CacheEntryPredicate[] filter, GridDrType drType, http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java index d19ecd7..17d88ae 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInterceptorSelfTestSuite.java @@ -58,6 +58,11 @@ public class IgniteCacheInterceptorSelfTestSuite extends TestSuite { suite.addTestSuite(CacheInterceptorPartitionCounterRandomOperationsTest.class); suite.addTestSuite(CacheInterceptorPartitionCounterLocalSanityTest.class); + suite.addTestSuite(GridCacheInterceptorAtomicRebalanceTest.class); + suite.addTestSuite(GridCacheInterceptorTransactionalRebalanceTest.class); + suite.addTestSuite(GridCacheInterceptorAtomicOffheapRebalanceTest.class); + suite.addTestSuite(GridCacheInterceptorTransactionalOffheapRebalanceTest.class); + return suite; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/147ab9c0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 083367c..1376be1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -57,6 +57,8 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; @@ -65,6 +67,7 @@ import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.communication.GridIoMessage; @@ -312,6 +315,102 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC } /** + * Test that during rebalancing correct old value passed to continuous query. + * + * @throws Exception If fail. + */ + public void testRebalance() throws Exception { + for (int iter = 0; iter < 5; iter++) { + log.info("Iteration: " + iter); + + final IgniteEx ignite = startGrid(1); + + final CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>("testCache"); + + ccfg.setAtomicityMode(atomicityMode()); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg.setCacheMode(cacheMode()); + ccfg.setRebalanceMode(CacheRebalanceMode.SYNC); + ccfg.setBackups(2); + + final IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(ccfg); + + final int KEYS = 10_000; + + for (int i = 0; i < KEYS; i++) + cache.put(i, i); + + final ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); + + final AtomicBoolean err = new AtomicBoolean(); + + final AtomicInteger cntr = new AtomicInteger(); + + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated( + final Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> cacheEntryEvts) { + try { + for (final CacheEntryEvent<? extends Integer, ? extends Integer> evt : cacheEntryEvts) { + final Integer oldVal = evt.getOldValue(); + + final Integer val = evt.getValue(); + + assertNotNull("No old value: " + evt, oldVal); + assertEquals("Unexpected old value: " + evt, (Integer)(oldVal + 1), val); + + cntr.incrementAndGet(); + } + } + catch (Throwable e) { + err.set(true); + + error("Unexpected error: " + e, e); + } + } + }); + + final QueryCursor<Cache.Entry<Integer, Integer>> cur = cache.query(qry); + + final CountDownLatch latch = new CountDownLatch(1); + + final IgniteInternalFuture<Object> updFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + latch.await(); + + for (int i = 0; i < KEYS && !err.get(); i++) + cache.put(i, i + 1); + + return null; + } + }); + + final IgniteInternalFuture<Object> rebFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + latch.await(); + + for (int i = 2; i <= 5 && !err.get(); i++) + startGrid(i); + + return null; + } + }); + + latch.countDown(); + + updFut.get(); + rebFut.get(); + + assertFalse("Unexpected error during test", err.get()); + + assertTrue(cntr.get() > 0); + + cur.close(); + + stopAllGrids(); + } + } + + /** * @param ignite Ignite. * @param topVer Topology version. * @throws Exception If failed.