This is an automated email from the ASF dual-hosted git repository. gvvinblade pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 5e9df2a IGNITE-12350: MVCC activated and causing memory leak (OOM) despite no mvccEnabled caches. This closes #8152 5e9df2a is described below commit 5e9df2ad4ea9c6e8276a672756edcff872fcf226 Author: Vladislav Pyatkov <vldpyat...@gmail.com> AuthorDate: Sun Aug 16 01:27:05 2020 +0300 IGNITE-12350: MVCC activated and causing memory leak (OOM) despite no mvccEnabled caches. This closes #8152 --- .../org/apache/ignite/internal/IgniteFeatures.java | 6 +- .../processors/cache/mvcc/MvccProcessorImpl.java | 36 +++--- .../cache/transactions/IgniteTxManager.java | 15 ++- .../cache/WalModeChangeAdvancedSelfTest.java | 4 +- .../cache/mvcc/MvccStructuresOverheadTest.java | 128 +++++++++++++++++++++ .../testsuites/IgniteCacheMvccTestSuite.java | 4 +- 6 files changed, 173 insertions(+), 20 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java index 2a2930e..4196435 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal; import java.util.BitSet; + import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.internal.managers.encryption.GridEncryptionManager; @@ -117,7 +118,10 @@ public enum IgniteFeatures { SHUTDOWN_POLICY(40), /** Force rebuild, list or request indexes rebuild status from control script. */ - INDEXES_MANIPULATIONS_FROM_CONTROL_SCRIPT(42); + INDEXES_MANIPULATIONS_FROM_CONTROL_SCRIPT(42), + + /** Optimization of recovery protocol for cluster which doesn't contain MVCC caches. */ + MVCC_TX_RECOVERY_PROTOCOL_V2(44); /** * Unique feature identifier. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java index 7fcc291..472c681 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java @@ -34,6 +34,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -506,25 +507,27 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce // 2. Notify previous queries. prevQueries.onNodeFailed(nodeId); - // 3. Recover transactions started by the failed node. - recoveryBallotBoxes.forEach((nearNodeId, ballotBox) -> { - // Put synthetic vote from another failed node - ballotBox.vote(nodeId); + if (mvccEnabled) { + // 3. Recover transactions started by the failed node. + recoveryBallotBoxes.forEach((nearNodeId, ballotBox) -> { + // Put synthetic vote from another failed node + ballotBox.vote(nodeId); - tryFinishRecoveryVoting(nearNodeId, ballotBox); - }); + tryFinishRecoveryVoting(nearNodeId, ballotBox); + }); - if (evt.eventNode().isClient()) { - RecoveryBallotBox ballotBox = recoveryBallotBoxes - .computeIfAbsent(nodeId, uuid -> new RecoveryBallotBox()); + if (evt.eventNode().isClient()) { + RecoveryBallotBox ballotBox = recoveryBallotBoxes + .computeIfAbsent(nodeId, uuid -> new RecoveryBallotBox()); - ballotBox.voters(evt.topologyNodes().stream() - // Nodes not supporting MVCC will never send votes to us. So, filter them away. - .filter(this::supportsMvcc) - .map(ClusterNode::id) - .collect(Collectors.toList())); + ballotBox.voters(evt.topologyNodes().stream() + // Nodes not supporting MVCC will never send votes to us. So, filter them away. + .filter(this::supportsMvcc) + .map(ClusterNode::id) + .collect(Collectors.toList())); - tryFinishRecoveryVoting(nodeId, ballotBox); + tryFinishRecoveryVoting(nodeId, ballotBox); + } } } } @@ -1858,6 +1861,9 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce * @param msg Message. */ private void processRecoveryFinishedMessage(UUID nodeId, MvccRecoveryFinishedMessage msg) { + if (!mvccEnabled) + return; + UUID nearNodeId = msg.nearNodeId(); RecoveryBallotBox ballotBox = recoveryBallotBoxes.computeIfAbsent(nearNodeId, uuid -> new RecoveryBallotBox()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 868d52d..706478f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; @@ -3239,8 +3240,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { ", failedNodeId=" + evtNodeId + ']'); // Null means that recovery voting is not needed. - GridCompoundFuture<IgniteInternalTx, Void> allTxFinFut = - node.isClient() && mvccCrd != null && mvccCrd.nodeId() != null + GridCompoundFuture<IgniteInternalTx, Void> allTxFinFut = isMvccRecoveryMessageRequired() ? new GridCompoundFuture<>() : null; for (final IgniteInternalTx tx : activeTransactions()) { @@ -3322,6 +3322,17 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * Determines need to send a recovery message or not. + * + * @return True if message required, false otherwise. + */ + private boolean isMvccRecoveryMessageRequired() { + return node.isClient() && mvccCrd != null && mvccCrd.nodeId() != null && + (cctx.kernalContext().coordinators().mvccEnabled() || + !IgniteFeatures.nodeSupports(cctx.node(mvccCrd.nodeId()), IgniteFeatures.MVCC_TX_RECOVERY_PROTOCOL_V2)); + } + + /** * @param tx Tx. * @param failedNode Failed node. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java index ba85c9f..175f95b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WalModeChangeAdvancedSelfTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; @@ -333,7 +334,8 @@ public class WalModeChangeAdvancedSelfTest extends WalModeChangeCommonAbstractSe String msg = e.getMessage(); assert msg.startsWith("Client node disconnected") || - msg.startsWith("Client node was disconnected") : e.getMessage(); + msg.startsWith("Client node was disconnected") || + msg.contains("client is disconnected") : e.getMessage(); } finally { state = !state; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccStructuresOverheadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccStructuresOverheadTest.java new file mode 100644 index 0000000..4c93677 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/MvccStructuresOverheadTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** + * Test checks a collecting unused MVCC structure, that will be able to create GC pressure. + */ +public class MvccStructuresOverheadTest extends GridCommonAbstractTest { + + /** + * Amount of restarts of clients. + */ + private static final int CLIENT_RESTARTS = 10; + + /** + * Is cahce confugured is MVCC or not. + */ + private boolean isMvccCache = false; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME) + .setAtomicityMode(isMvccCache ? + CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT : + CacheAtomicityMode.ATOMIC)); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + } + + /** + * Starts grid with ATOMIC cache. + * + * @throws Exception If failed. + */ + @Test + public void testWithoutMvcc() throws Exception { + restartClients(); + } + + /** + * Starts grid with WVCC cache. + * + * @throws Exception If failed. + */ + @Test + public void testWithMvcc() throws Exception { + isMvccCache = true; + + restartClients(); + } + + /** + * Starts cluster and restarts several clients over it. + * + * @throws Exception If failed. + */ + private void restartClients() throws Exception { + IgniteEx ignite = startGrid(0); + + AtomicBoolean mvccMessageTranslated = new AtomicBoolean(); + + ignite.context().io().addMessageListener(GridTopic.TOPIC_CACHE_COORDINATOR, (nodeId, msg, plc) -> { + if (msg instanceof MvccRecoveryFinishedMessage) + mvccMessageTranslated.set(true); + }); + + Map recoveryBallotBoxes = U.field(ignite.context().coordinators(), "recoveryBallotBoxes"); + + for (int i = 0; i < CLIENT_RESTARTS; i++) { + IgniteEx client = startClientGrid(1); + + IgniteCache cache = client.cache(DEFAULT_CACHE_NAME); + + cache.put(i, i); + + client.close(); + + if (isMvccCache) { + assertTrue(GridTestUtils.waitForCondition(mvccMessageTranslated::get, 10_000)); + + assertTrue("Size of recoveryBallotBoxes " + recoveryBallotBoxes.size(), recoveryBallotBoxes.isEmpty()); + + mvccMessageTranslated.compareAndSet(true, false); + } + else { + assertFalse(mvccMessageTranslated.get()); + + assertTrue("Size of recoveryBallotBoxes " + recoveryBallotBoxes.size(), recoveryBallotBoxes.isEmpty()); + } + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java index a7d15aa..4689006 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTransactionsTes import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxFailoverTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccVacuumTest; import org.apache.ignite.internal.processors.cache.mvcc.MvccCachePeekTest; +import org.apache.ignite.internal.processors.cache.mvcc.MvccStructuresOverheadTest; import org.apache.ignite.internal.processors.cache.mvcc.MvccUnsupportedTxModesTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccPersistenceSelfTest; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorMvccSelfTest; @@ -72,7 +73,8 @@ import org.junit.runners.Suite; CacheMvccPartitionedCoordinatorFailoverTest.class, CacheMvccReplicatedCoordinatorFailoverTest.class, CacheMvccProcessorLazyStartTest.class, - CacheMvccClientReconnectTest.class + CacheMvccClientReconnectTest.class, + MvccStructuresOverheadTest.class }) public class IgniteCacheMvccTestSuite { }