Repository: ignite Updated Branches: refs/heads/ignite-2.0 a5088265d -> a4c397076
ignite-4929 Fixed issue with incorrect return value on backup for one-phase tx invoke (anyway old value is sent on backups on changed topology, use this value on backup for invoke) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a4c39707 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a4c39707 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a4c39707 Branch: refs/heads/ignite-2.0 Commit: a4c397076954d10cfb200fe30060ed0d118a3fc1 Parents: a508826 Author: sboikov <sboi...@gridgain.com> Authored: Thu Apr 20 10:26:09 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Apr 20 10:26:09 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheUtils.java | 9 - .../distributed/GridDistributedLockRequest.java | 14 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 26 +-- .../near/GridNearSingleGetRequest.java | 4 +- .../cache/transactions/IgniteTxAdapter.java | 16 +- .../cache/transactions/IgniteTxEntry.java | 39 ++-- .../IgfsMetaDirectoryListingAddProcessor.java | 5 +- .../internal/TestRecordingCommunicationSpi.java | 9 + .../cache/IgniteOnePhaseCommitInvokeTest.java | 213 +++++++++++++++++++ .../IgniteCachePutRetryAbstractSelfTest.java | 25 +-- ...gniteCachePutRetryTransactionalSelfTest.java | 2 +- .../atomic/IgniteCacheAtomicProtocolTest.java | 16 +- .../testframework/junits/GridAbstractTest.java | 9 +- .../junits/common/GridCommonAbstractTest.java | 79 +++++++ .../testsuites/IgniteCacheTestSuite2.java | 3 + 15 files changed, 360 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 51a95a6..5abb6de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -196,15 +196,6 @@ public class GridCacheUtils { /** Expire time: must be calculated based on TTL value. */ public static final long EXPIRE_TIME_CALCULATE = -1L; - /** Skip store flag bit mask. */ - public static final int SKIP_STORE_FLAG_MASK = 0x1; - - /** Keep serialized flag. */ - public static final int KEEP_BINARY_FLAG_MASK = 0x2; - - /** Flag indicating that old value for 'invoke' operation was non null on primary node. */ - public static final int OLD_VAL_ON_PRIMARY = 0x4; - /** Empty predicate array. */ private static final IgnitePredicate[] EMPTY = new IgnitePredicate[0]; http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java index b1c2c27..74f34a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java @@ -38,9 +38,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.KEEP_BINARY_FLAG_MASK; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK; - /** * Lock request message. */ @@ -48,6 +45,12 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { /** */ private static final long serialVersionUID = 0L; + /** Skip store flag bit mask. */ + private static final int SKIP_STORE_FLAG_MASK = 0x01; + + /** Keep binary flag. */ + private static final int KEEP_BINARY_FLAG_MASK = 0x02; + /** Sender node ID. */ private UUID nodeId; @@ -90,10 +93,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { /** Key count. */ private int txSize; - /** - * Additional flags. - * GridCacheUtils.SKIP_STORE_FLAG_MASK - for skipStore flag value. - */ + /** Additional flags. */ private byte flags; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 464df6e..e2b7803 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -67,7 +67,6 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; -import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -373,8 +372,6 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite tx.nearOnOriginatingNode() || tx.hasInterceptor(); if (readOld) { - cached.unswap(retVal); - boolean readThrough = !txEntry.skipStore() && (txEntry.op() == TRANSFORM || ((retVal || hasFilters) && cacheCtx.config().isLoadPreviousValue())); @@ -482,7 +479,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite } // Send old value in case if rebalancing is not finished. - final boolean sndOldVal = !cacheCtx.isLocal() && !cacheCtx.topology().rebalanceFinished(tx.topologyVersion()); + final boolean sndOldVal = !cacheCtx.isLocal() && + !cacheCtx.topology().rebalanceFinished(tx.topologyVersion()); if (sndOldVal) { if (oldVal == null && !readOld) { @@ -499,11 +497,10 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite /*keepBinary*/true); } - if (oldVal != null) { + if (oldVal != null) oldVal.prepareMarshal(cacheCtx.cacheObjectContext()); - txEntry.oldValue(oldVal, true); - } + txEntry.oldValue(oldVal); } } catch (IgniteCheckedException e) { @@ -1532,21 +1529,6 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite ) { GridDistributedTxMapping global = globalMap.get(n.id()); - if (!F.isEmpty(entry.entryProcessors())) { - GridDhtPartitionState state = entry.context().topology().partitionState(n.id(), - entry.cached().partition()); - - if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) { - T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue(); - - assert procVal != null : entry; - - entry.op(procVal.get1()); - entry.value(procVal.get2(), true, false); - entry.entryProcessors(null); - } - } - if (global == null) globalMap.put(n.id(), global = new GridDistributedTxMapping(n)); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java index ab0afb1..0faa8ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java @@ -32,8 +32,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK; - /** * */ @@ -212,7 +210,7 @@ public class GridNearSingleGetRequest extends GridCacheMessage implements GridCa * @return Read through flag. */ public boolean readThrough() { - return (flags & SKIP_STORE_FLAG_MASK) != 0; + return (flags & READ_THROUGH_FLAG_MASK) != 0; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index d3b39bd..5cba0cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -83,6 +83,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CRE import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.RELOAD; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; @@ -1381,6 +1382,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement IgniteTxEntry txEntry, boolean metrics, @Nullable GridCacheReturn ret) throws GridCacheEntryRemovedException, IgniteCheckedException { + assert txEntry.op() != TRANSFORM || !F.isEmpty(txEntry.entryProcessors()) : txEntry; + GridCacheContext cacheCtx = txEntry.context(); assert cacheCtx != null; @@ -1404,18 +1407,25 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement final boolean keepBinary = txEntry.keepBinary(); - CacheObject cacheVal = txEntry.hasValue() ? txEntry.value() : - txEntry.cached().innerGet( + CacheObject cacheVal; + + if (txEntry.hasValue()) + cacheVal = txEntry.value(); + else if (txEntry.hasOldValue()) + cacheVal = txEntry.oldValue(); + else { + cacheVal = txEntry.cached().innerGet( null, this, /*read through*/false, /*metrics*/metrics, /*event*/recordEvt, /*subjId*/subjId, - /**closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null, + /*closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null, resolveTaskName(), null, keepBinary); + } boolean modified = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 1f8a107..163ed99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -37,7 +37,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -57,9 +56,6 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.KEEP_BINARY_FLAG_MASK; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.OLD_VAL_ON_PRIMARY; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK; /** * Transaction entry. Note that it is essential that this class does not override @@ -83,6 +79,15 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** */ public static final GridCacheVersion GET_ENTRY_INVALID_VER_AFTER_GET = new GridCacheVersion(0, 0, 3); + /** Skip store flag bit mask. */ + private static final int TX_ENTRY_SKIP_STORE_FLAG_MASK = 0x01; + + /** Keep binary flag. */ + private static final int TX_ENTRY_KEEP_BINARY_FLAG_MASK = 0x02; + + /** Flag indicating that old value for 'invoke' operation was non null on primary node. */ + private static final int TX_ENTRY_OLD_VAL_ON_PRIMARY = 0x04; + /** Prepared flag updater. */ private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD = AtomicIntegerFieldUpdater.newUpdater(IgniteTxEntry.class, "prepared"); @@ -194,13 +199,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** Expiry policy bytes. */ private byte[] expiryPlcBytes; - /** - * Additional flags: - * <ul> - * <li>{@link GridCacheUtils#SKIP_STORE_FLAG_MASK} - for skipStore flag value.</li> - * <li>{@link GridCacheUtils#KEEP_BINARY_FLAG_MASK} - for withKeepBinary flag.</li> - * </ul> - */ + /** Additional flags. */ private byte flags; /** Partition update counter. */ @@ -484,28 +483,28 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { * @param skipStore Skip store flag. */ public void skipStore(boolean skipStore) { - setFlag(skipStore, SKIP_STORE_FLAG_MASK); + setFlag(skipStore, TX_ENTRY_SKIP_STORE_FLAG_MASK); } /** * @return Skip store flag. */ public boolean skipStore() { - return isFlag(SKIP_STORE_FLAG_MASK); + return isFlag(TX_ENTRY_SKIP_STORE_FLAG_MASK); } /** * @param oldValOnPrimary {@code True} If old value for was non null on primary node. */ public void oldValueOnPrimary(boolean oldValOnPrimary) { - setFlag(oldValOnPrimary, OLD_VAL_ON_PRIMARY); + setFlag(oldValOnPrimary, TX_ENTRY_OLD_VAL_ON_PRIMARY); } /** * @return {@code True} If old value for 'invoke' operation was non null on primary node. */ - public boolean oldValueOnPrimary() { - return isFlag(OLD_VAL_ON_PRIMARY); + boolean oldValueOnPrimary() { + return isFlag(TX_ENTRY_OLD_VAL_ON_PRIMARY); } /** @@ -514,14 +513,14 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { * @param keepBinary Keep binary flag value. */ public void keepBinary(boolean keepBinary) { - setFlag(keepBinary, KEEP_BINARY_FLAG_MASK); + setFlag(keepBinary, TX_ENTRY_KEEP_BINARY_FLAG_MASK); } /** * @return Keep binary flag value. */ public boolean keepBinary() { - return isFlag(KEEP_BINARY_FLAG_MASK); + return isFlag(TX_ENTRY_KEEP_BINARY_FLAG_MASK); } /** @@ -588,11 +587,11 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** * @param oldVal Old value. */ - public void oldValue(CacheObject oldVal, boolean hasOldVal) { + public void oldValue(CacheObject oldVal) { if (this.oldVal == null) this.oldVal = new TxEntryValueHolder(); - this.oldVal.value(op(), oldVal, hasOldVal, hasOldVal); + this.oldVal.value(op(), oldVal, true, true); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java index 380d997..2e7ecae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java @@ -80,17 +80,18 @@ public final class IgfsMetaDirectoryListingAddProcessor implements EntryProcesso @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args) { IgfsEntryInfo fileInfo = e.getValue(); - assert fileInfo.isDirectory(); + assert fileInfo != null && fileInfo.isDirectory() : fileInfo; Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing()); // Modify listing in-place. IgfsListingEntry oldEntry = listing.put(fileName, entry); - if (oldEntry != null && !oldEntry.fileId().equals(entry.fileId())) + if (oldEntry != null && !oldEntry.fileId().equals(entry.fileId())) { throw new IgniteException("Directory listing contains unexpected file" + " [listing=" + listing + ", fileName=" + fileName + ", entry=" + entry + ", oldEntry=" + oldEntry + ']'); + } e.setValue(fileInfo.listing(listing)); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java index c3d26b7..aa0cc09 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.managers.communication.GridIoMessage; @@ -60,6 +61,14 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { /** */ private IgnitePredicate<GridIoMessage> blockP; + /** + * @param node Node. + * @return Test SPI. + */ + public static TestRecordingCommunicationSpi spi(Ignite node) { + return (TestRecordingCommunicationSpi)node.configuration().getCommunicationSpi(); + } + /** {@inheritDoc} */ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java new file mode 100644 index 0000000..601c067 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.concurrent.Callable; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteOnePhaseCommitInvokeTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** */ + private static final String CACHE_NAME = "testCache"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); + + cfg.setCommunicationSpi(commSpi); + + cfg.setClientMode(client); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(CACHE_NAME); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setBackups(1); + ccfg.setRebalanceMode(ASYNC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testOnePhaseInvoke() throws Exception { + boolean flags[] = {true, false}; + + for (boolean withOldVal : flags) { + for (boolean setVal : flags) { + for (boolean retPrev : flags) { + onePhaseInvoke(withOldVal, setVal, retPrev); + + stopAllGrids(); + } + } + } + } + + /** + * @param withOldVal If {@code true} + * @param setVal Flag whether set value from entry processor. + * @param retPrev Flag whether entry processor should return previous value. + * @throws Exception If failed. + */ + private void onePhaseInvoke(final boolean withOldVal, + final boolean setVal, + final boolean retPrev) + throws Exception + { + log.info("Test onePhaseInvoke [withOldVal=" + withOldVal + ", setVal=" + setVal + ", retPrev=" + retPrev + ']'); + + Ignite srv0 = startGrid(0); + + if (withOldVal) + srv0.cache(CACHE_NAME).put(1, 1); + + client = true; + + final Ignite clientNode = startGrid(1); + + TestRecordingCommunicationSpi.spi(srv0).blockMessages(new IgnitePredicate<GridIoMessage>() { + @Override public boolean apply(GridIoMessage msg0) { + Message msg = msg0.message(); + + return msg instanceof GridDhtPartitionSupplyMessage && + ((GridDhtPartitionSupplyMessage)msg).cacheId() == CU.cacheId(CACHE_NAME); + } + }); + + client = false; + + Ignite srv1 = startGrid(2); + + TestRecordingCommunicationSpi.spi(srv1).blockMessages(GridDhtTxPrepareResponse.class, srv0.name()); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + Object res = clientNode.cache(CACHE_NAME).invoke(1, new TestEntryProcessor(setVal, retPrev)); + + Object expRes; + + if (retPrev) + expRes = withOldVal ? 1 : null; + else + expRes = null; + + assertEquals(expRes, res); + + return null; + } + }); + + U.sleep(1000); + + stopGrid(0); + + fut.get(); + + if (!setVal) + checkCacheData(F.asMap(1, null), CACHE_NAME); + else { + Object expVal; + + if (setVal) + expVal = 2; + else + expVal = withOldVal ? 1 : null; + + checkCacheData(F.asMap(1, expVal), CACHE_NAME); + } + + checkOnePhaseCommitReturnValuesCleaned(-1); + } + + /** + * + */ + static class TestEntryProcessor implements CacheEntryProcessor { + /** */ + private final boolean setVal; + + /** */ + private final boolean retPrev; + + /** + * @param setVal Set value flag. + * @param retPrev Return previous value flag. + */ + TestEntryProcessor(boolean setVal, boolean retPrev) { + this.setVal = setVal; + this.retPrev = retPrev; + } + + /** {@inheritDoc} */ + @Override public Object process(MutableEntry e, Object... args) { + Object val = e.getValue(); + + if (setVal) + e.setValue(2); + + return retPrev ? val : null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java index ca55a47..abec33c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java @@ -443,13 +443,13 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst private void checkInternalCleanup() throws Exception{ checkNoAtomicFutures(); - checkOnePhaseCommitReturnValuesCleaned(); + checkOnePhaseCommitReturnValuesCleaned(GRID_CNT); } /** * @throws Exception If failed. */ - void checkNoAtomicFutures() throws Exception { + private void checkNoAtomicFutures() throws Exception { for (int i = 0; i < GRID_CNT; i++) { final IgniteKernal ignite = (IgniteKernal)grid(i); @@ -468,27 +468,6 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst /** * @throws Exception If failed. */ - void checkOnePhaseCommitReturnValuesCleaned() throws Exception { - U.sleep(DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT); - - for (int i = 0; i < GRID_CNT; i++) { - IgniteKernal ignite = (IgniteKernal)grid(i); - - IgniteTxManager tm = ignite.context().cache().context().tm(); - - Map completedVersHashMap = U.field(tm, "completedVersHashMap"); - - for (Object o : completedVersHashMap.values()) { - assertTrue("completedVersHashMap contains" + o.getClass() + " instead of boolean. " + - "These values should be replaced by boolean after onePhaseCommit finished. " + - "[node=" + i + "]", o instanceof Boolean); - } - } - } - - /** - * @throws Exception If failed. - */ public void testFailsWithNoRetries() throws Exception { checkFailsWithNoRetries(false); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java index b439bcc..8e4b3a4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java @@ -260,7 +260,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr fut2.get(); } - checkOnePhaseCommitReturnValuesCleaned(); + checkOnePhaseCommitReturnValuesCleaned(GRID_CNT); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java index dfb3f65..29d67e2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java @@ -763,21 +763,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest { * @param expData Expected cache data. */ private void checkData(Map<Integer, Integer> expData) { - assert !expData.isEmpty(); - - List<Ignite> nodes = G.allGrids(); - - assertFalse(nodes.isEmpty()); - - for (Ignite node : nodes) { - IgniteCache<Integer, Integer> cache = node.cache(TEST_CACHE); - - for (Map.Entry<Integer, Integer> e : expData.entrySet()) { - assertEquals("Invalid value [key=" + e.getKey() + ", node=" + node.name() + ']', - e.getValue(), - cache.get(e.getKey())); - } - } + checkCacheData(expData, TEST_CACHE); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index af623da..8a7150d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -2024,10 +2024,9 @@ public abstract class GridAbstractTest extends TestCase { } /** - * - * @throws IgniteInterruptedCheckedException + * @throws IgniteInterruptedCheckedException If interrupted. */ - public void awaitTopologyChange() throws IgniteInterruptedCheckedException { + private void awaitTopologyChange() throws IgniteInterruptedCheckedException { for (Ignite g : G.allGrids()) { final GridKernalContext ctx = ((IgniteKernal)g).context(); @@ -2038,7 +2037,9 @@ public abstract class GridAbstractTest extends TestCase { AffinityTopologyVersion exchVer = ctx.cache().context().exchange().readyAffinityVersion(); if (! topVer.equals(exchVer)) { - info("topology version mismatch: node " + g.name() + " " + exchVer + ", " + topVer); + info("Topology version mismatch [node=" + g.name() + + ", exchVer=" + exchVer + + ", topVer=" + topVer + ']'); GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index cef35e5..c76c83e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -80,7 +80,9 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.local.GridLocalCache; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.PA; @@ -1577,4 +1579,81 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { .setEnforceJoinOrder(qry.isEnforceJoinOrder())) .getAll().get(0).get(0); } + + /** + * @param expData Expected cache data. + * @param cacheName Cache name. + */ + protected final void checkCacheData(Map<?, ?> expData, String cacheName) { + assert !expData.isEmpty(); + + List<Ignite> nodes = G.allGrids(); + + assertFalse(nodes.isEmpty()); + + for (Ignite node : nodes) { + IgniteCache<Object, Object> cache = node.cache(cacheName); + + for (Map.Entry<?, ?> e : expData.entrySet()) { + assertEquals("Invalid value [key=" + e.getKey() + ", node=" + node.name() + ']', + e.getValue(), + cache.get(e.getKey())); + } + } + } + + /** + * @param nodesCnt Expected nodes number or {@code -1} to use all nodes. + * @throws Exception If failed. + */ + protected final void checkOnePhaseCommitReturnValuesCleaned(final int nodesCnt) throws Exception { + final List<Ignite> nodes; + + if (nodesCnt == -1) { + nodes = G.allGrids(); + + assertTrue(nodes.size() > 0); + } + else { + nodes = new ArrayList<>(nodesCnt); + + for (int i = 0; i < nodesCnt; i++) + nodes.add(grid(i)); + } + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (Ignite node : nodes) { + Map completedVersHashMap = completedTxsMap(node); + + for (Object o : completedVersHashMap.values()) { + if (!(o instanceof Boolean)) + return false; + } + } + + return true; + } + }, 5000); + + for (Ignite node : nodes) { + Map completedVersHashMap = completedTxsMap(node); + + for (Object o : completedVersHashMap.values()) { + assertTrue("completedVersHashMap contains " + o.getClass().getName() + " instead of boolean. " + + "These values should be replaced by boolean after onePhaseCommit finished. " + + "[node=" + node.name() + "]", o instanceof Boolean); + } + } + } + + /** + * @param ignite Node. + * @return Completed txs map. + */ + private Map completedTxsMap(Ignite ignite) { + IgniteTxManager tm = ((IgniteKernal)ignite).context().cache().context().tm(); + + return U.field(tm, "completedVersHashMap"); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a4c39707/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index 62e6b78..89e8f01 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheEntryProcessorNode import org.apache.ignite.internal.processors.cache.IgniteCacheIncrementTxTest; import org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdateTest; import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop; +import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitInvokeTest; import org.apache.ignite.internal.processors.cache.MemoryPolicyConfigValidationTest; import org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTest; import org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTestAllowOverwrite; @@ -262,6 +263,8 @@ public class IgniteCacheTestSuite2 extends TestSuite { suite.addTest(new TestSuite(GridNearCacheStoreUpdateTest.class)); + suite.addTest(new TestSuite(IgniteOnePhaseCommitInvokeTest.class)); + return suite; } }