This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new d73bcd9a2a GEODE-10335: TXManagerImpl.close resets currentInstance (#7844) d73bcd9a2a is described below commit d73bcd9a2aa0eeb94fad0e9d225a87b9af024000 Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Thu Sep 8 08:10:05 2022 +0200 GEODE-10335: TXManagerImpl.close resets currentInstance (#7844) * GEODE-10335: TXManagerImpl.close resets currentInstance * GEODE-10335: added test --- .../apache/geode/internal/cache/TXManagerImpl.java | 32 ++-- .../geode/internal/cache/TXManagerImplTest.java | 164 +++++++++++++-------- 2 files changed, 122 insertions(+), 74 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java index 96b8a2a7d5..1dacdcd06d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import org.apache.logging.log4j.Logger; @@ -88,7 +89,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene private final ThreadLocal<Boolean> pauseJTA; @MakeNotStatic - private static TXManagerImpl currentInstance = null; + private static final AtomicReference<TXManagerImpl> currentInstance = new AtomicReference<>(); // The unique transaction ID for this Manager private final AtomicInteger uniqId; @@ -202,16 +203,16 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene isTXDistributed = new ThreadLocal<>(); transactionTimeToLive = Integer .getInteger(GeodeGlossary.GEMFIRE_PREFIX + "cacheServer.transactionTimeToLive", 180); - currentInstance = this; + currentInstance.set(this); this.statisticsClock = statisticsClock; } public static TXManagerImpl getCurrentInstanceForTest() { - return currentInstance; + return currentInstance.get(); } public static void setCurrentInstanceForTest(TXManagerImpl instance) { - currentInstance = instance; + currentInstance.set(instance); } InternalCache getCache() { @@ -687,6 +688,10 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene for (final TransactionListener listener : listeners) { closeListener(listener); } + TXManagerImpl instance = currentInstance.get(); + if (instance != null) { + currentInstance.set(null); + } } private void closeListener(TransactionListener tl) { @@ -855,17 +860,19 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene } public static int getCurrentTXUniqueId() { - if (currentInstance == null) { + TXManagerImpl instance = currentInstance.get(); + if (instance == null) { return NOTX; } - return currentInstance.getMyTXUniqueId(); + return instance.getMyTXUniqueId(); } public static TXStateProxy getCurrentTXState() { - if (currentInstance == null) { + TXManagerImpl instance = currentInstance.get(); + if (instance == null) { return null; } - return currentInstance.getTXState(); + return instance.getTXState(); } public int getMyTXUniqueId() { @@ -1633,7 +1640,10 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene @Override public void run2() { - TXManagerImpl mgr = TXManagerImpl.currentInstance; + TXManagerImpl mgr = TXManagerImpl.currentInstance.get(); + if (mgr == null) { + return; + } TXStateProxy tx = mgr.suspendedTXs.remove(txId); if (tx != null) { try { @@ -1781,7 +1791,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene }; public static void incRefCount(AbstractRegionEntry re) { - TXManagerImpl mgr = currentInstance; + TXManagerImpl mgr = currentInstance.get(); if (mgr != null) { mgr.refCountMap.create(re, incCallback, null, null, true); } @@ -1791,7 +1801,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene * Return true if refCount went to zero. */ public static boolean decRefCount(AbstractRegionEntry re) { - TXManagerImpl mgr = currentInstance; + TXManagerImpl mgr = currentInstance.get(); if (mgr != null) { return mgr.refCountMap.removeConditionally(re, decCallback, null, null) != null; } else { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java index 1d3e4fd6da..24b1af2ccb 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java @@ -17,12 +17,7 @@ package org.apache.geode.internal.cache; import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; @@ -105,9 +100,12 @@ public class TXManagerImplTest { public void getOrSetHostedTXStateAbleToSetTXStateAndGetLock() { TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg); - assertNotNull(tx); - assertEquals(tx, txMgr.getHostedTXState(txid)); - assertTrue(txMgr.getLock(tx, txid)); + assertThat(tx).isNotNull(); + assertThat(txMgr.getHostedTXState(txid)).isEqualTo(tx); + assertThat(txMgr.getLock(tx, txid)).isTrue(); + + txMgr.close(); + assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull(); } @Test @@ -116,9 +114,13 @@ public class TXManagerImplTest { TXManagerImpl txManager = new TXManagerImpl(mock(CachePerfStats.class), cache, disabledClock()); txManager.setDistributed(false); TXStateProxy proxy = txManager.beginJTA(); - assertEquals(1, proxy.getTxId().getUniqId()); - assertNotNull(txManager); + assertThat(proxy.getTxId().getUniqId()).isEqualTo(1); + assertThat(txManager).isNotNull(); TXManagerImpl.INITIAL_UNIQUE_ID_VALUE = 0; + assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNotNull(); + + txManager.close(); + assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull(); } @Test @@ -126,22 +128,26 @@ public class TXManagerImplTest { TXManagerImpl txManager = new TXManagerImpl(mock(CachePerfStats.class), cache, disabledClock()); txManager.setDistributed(false); TXStateProxy proxy = txManager.beginJTA(); - assertEquals(1, proxy.getTxId().getUniqId()); - assertNotNull(txManager); + assertThat(proxy.getTxId().getUniqId()).isEqualTo(1); + assertThat(txManager).isNotNull(); + assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNotNull(); + + txManager.close(); + assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull(); } @Test public void getLockAfterTXStateRemoved() throws InterruptedException { TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg); - assertEquals(tx, txMgr.getHostedTXState(txid)); - assertTrue(txMgr.getLock(tx, txid)); - assertNotNull(tx); - assertTrue(txMgr.getLock(tx, txid)); + assertThat(txMgr.getHostedTXState(txid)).isEqualTo(tx); + assertThat(txMgr.getLock(tx, txid)).isTrue(); + assertThat(tx).isNotNull(); + assertThat(txMgr.getLock(tx, txid)).isTrue(); tx.getLock().unlock(); TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg); - assertEquals(tx, oldtx); + assertThat(oldtx).isEqualTo(tx); Thread t1 = new Thread(() -> txMgr.removeHostedTXState(txid)); t1.start(); @@ -149,12 +155,12 @@ public class TXManagerImplTest { t1.join(); TXStateProxy curTx = txMgr.getHostedTXState(txid); - assertNull(curTx); + assertThat(curTx).isNull(); // after failover command removed the txid from hostedTXState, // getLock should put back the original TXStateProxy - assertTrue(txMgr.getLock(tx, txid)); - assertEquals(tx, txMgr.getHostedTXState(txid)); + assertThat(txMgr.getLock(tx, txid)).isTrue(); + assertThat(txMgr.getHostedTXState(txid)).isEqualTo(tx); tx.getLock().unlock(); } @@ -163,13 +169,13 @@ public class TXManagerImplTest { public void getLockAfterTXStateReplaced() throws InterruptedException { TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg); - assertEquals(oldtx, txMgr.getHostedTXState(txid)); - assertTrue(txMgr.getLock(oldtx, txid)); - assertNotNull(oldtx); + assertThat(txMgr.getHostedTXState(txid)).isEqualTo(oldtx); + assertThat(txMgr.getLock(oldtx, txid)).isTrue(); + assertThat(oldtx).isNotNull(); oldtx.getLock().unlock(); TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg); - assertEquals(tx, oldtx); + assertThat(oldtx).isEqualTo(tx); Thread t1 = new Thread(() -> { txMgr.removeHostedTXState(txid); @@ -181,12 +187,12 @@ public class TXManagerImplTest { t1.join(); TXStateProxy curTx = txMgr.getHostedTXState(txid); - assertNotNull(curTx); + assertThat(curTx).isNotNull(); // replaced - assertNotEquals(tx, curTx); + assertThat(curTx).isNotEqualTo(tx); // after TXStateProxy replaced, getLock will not get - assertFalse(txMgr.getLock(tx, txid)); + assertThat(txMgr.getLock(tx, txid)).isFalse(); } @@ -194,13 +200,13 @@ public class TXManagerImplTest { public void getLockAfterTXStateCommitted() throws InterruptedException { TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg); - assertEquals(oldtx, txMgr.getHostedTXState(txid)); - assertTrue(txMgr.getLock(oldtx, txid)); - assertNotNull(oldtx); + assertThat(txMgr.getHostedTXState(txid)).isEqualTo(oldtx); + assertThat(txMgr.getLock(oldtx, txid)).isTrue(); + assertThat(oldtx).isNotNull(); oldtx.getLock().unlock(); TXStateProxy tx = txMgr.getOrSetHostedTXState(txid, msg); - assertEquals(tx, oldtx); + assertThat(oldtx).isEqualTo(tx); Thread t1 = new Thread(() -> { when(msg.getTXOriginatorClient()).thenReturn(mock(InternalDistributedMember.class)); @@ -225,12 +231,12 @@ public class TXManagerImplTest { t1.join(); TXStateProxy curTx = txMgr.getHostedTXState(txid); - assertNull(curTx); + assertThat(curTx).isNull(); - assertFalse(tx.isInProgress()); + assertThat(tx.isInProgress()).isFalse(); // after TXStateProxy committed, getLock will get the lock for the oldtx // but caller should not perform ops on this TXStateProxy - assertTrue(txMgr.getLock(tx, txid)); + assertThat(txMgr.getLock(tx, txid)).isTrue(); } @Test @@ -238,7 +244,10 @@ public class TXManagerImplTest { TXStateProxy tx; tx = txMgr.masqueradeAs(msg); - assertNotNull(tx); + assertThat(tx).isNotNull(); + + txMgr.close(); + assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull(); } @Test @@ -247,10 +256,10 @@ public class TXManagerImplTest { Thread t1 = new Thread(() -> { tx1 = txMgr.getHostedTXState(txid); - assertNull(tx1); + assertThat(tx1).isNull(); tx1 = txMgr.getOrSetHostedTXState(txid, msg); - assertNotNull(tx1); - assertTrue(txMgr.getLock(tx1, txid)); + assertThat(tx1).isNotNull(); + assertThat(txMgr.getLock(tx1, txid)).isTrue(); latch.countDown(); @@ -260,19 +269,19 @@ public class TXManagerImplTest { txMgr.removeHostedTXState(txid); tx2 = txMgr.getOrSetHostedTXState(txid, msg); - assertNotNull(tx2); - assertTrue(txMgr.getLock(tx2, txid)); + assertThat(tx2).isNotNull(); + assertThat(txMgr.getLock(tx2, txid)).isTrue(); tx2.getLock().unlock(); tx1.getLock().unlock(); }); t1.start(); - assertTrue(latch.await(60, TimeUnit.SECONDS)); + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); tx = txMgr.masqueradeAs(msg); - assertNotNull(tx); - assertEquals(tx, tx2); + assertThat(tx).isNotNull(); + assertThat(tx2).isEqualTo(tx); tx.getLock().unlock(); t1.join(); @@ -282,7 +291,10 @@ public class TXManagerImplTest { @Test public void testTxStateWithNotFinishedTx() { TXStateProxy tx = txMgr.getOrSetHostedTXState(notCompletedTxid, msg); - assertTrue(tx.isInProgress()); + assertThat(tx.isInProgress()).isTrue(); + + txMgr.close(); + assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull(); } @Test @@ -296,7 +308,10 @@ public class TXManagerImplTest { } finally { txMgr.unmasquerade(tx); } - assertFalse(tx.isInProgress()); + assertThat(tx.isInProgress()).isFalse(); + + txMgr.close(); + assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull(); } @Test @@ -310,7 +325,10 @@ public class TXManagerImplTest { } finally { txMgr.unmasquerade(tx); } - assertFalse(tx.isInProgress()); + assertThat(tx.isInProgress()).isFalse(); + + txMgr.close(); + assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull(); } private void setupTx() throws InterruptedException { @@ -342,10 +360,10 @@ public class TXManagerImplTest { }); t1.start(); - assertTrue(latch.await(60, TimeUnit.SECONDS)); + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); TXStateProxy tx = txMgr.masqueradeAs(rollbackMsg); - assertEquals(tx, tx1); + assertThat(tx1).isEqualTo(tx); t1.join(); rollbackTransaction(tx); } @@ -375,42 +393,50 @@ public class TXManagerImplTest { public void txStateNotCleanedupIfNotRemovedFromHostedTxStatesMap() { tx1 = txMgr.getOrSetHostedTXState(txid, msg); TXStateProxyImpl txStateProxy = (TXStateProxyImpl) tx1; - assertNotNull(txStateProxy); - assertFalse(txStateProxy.getLocalRealDeal().isClosed()); + assertThat(txStateProxy).isNotNull(); + assertThat(txStateProxy.getLocalRealDeal().isClosed()).isFalse(); txMgr.masqueradeAs(tx1); txMgr.unmasquerade(tx1); - assertFalse(txStateProxy.getLocalRealDeal().isClosed()); + assertThat(txStateProxy.getLocalRealDeal().isClosed()).isFalse(); + txMgr.close(); + assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull(); } @Test public void txStateCleanedUpIfRemovedFromHostedTxStatesMapCausedByFailover() { tx1 = txMgr.getOrSetHostedTXState(txid, msg); TXStateProxyImpl txStateProxy = (TXStateProxyImpl) tx1; - assertNotNull(txStateProxy); - assertFalse(txStateProxy.getLocalRealDeal().isClosed()); + assertThat(txStateProxy).isNotNull(); + assertThat(txStateProxy.getLocalRealDeal().isClosed()).isFalse(); txStateProxy.setRemovedCausedByFailover(true); txMgr.masqueradeAs(tx1); // during TX failover, tx can be removed from the hostedTXStates map by FindRemoteTXMessage txMgr.getHostedTXStates().remove(txid); txMgr.unmasquerade(tx1); - assertTrue(txStateProxy.getLocalRealDeal().isClosed()); + assertThat(txStateProxy.getLocalRealDeal().isClosed()).isTrue(); + + txMgr.close(); + assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull(); } @Test public void txStateDoesNotCleanUpIfRemovedFromHostedTxStatesMapNotCausedByFailover() { tx1 = txMgr.getOrSetHostedTXState(txid, msg); TXStateProxyImpl txStateProxy = (TXStateProxyImpl) tx1; - assertNotNull(txStateProxy); - assertFalse(txStateProxy.getLocalRealDeal().isClosed()); + assertThat(txStateProxy).isNotNull(); + assertThat(txStateProxy.getLocalRealDeal().isClosed()).isFalse(); txMgr.masqueradeAs(tx1); // during TX failover, tx can be removed from the hostedTXStates map by FindRemoteTXMessage txMgr.getHostedTXStates().remove(txid); txMgr.unmasquerade(tx1); - assertFalse(txStateProxy.getLocalRealDeal().isClosed()); + assertThat(txStateProxy.getLocalRealDeal().isClosed()).isFalse(); + + txMgr.close(); + assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull(); } @Test @@ -422,7 +448,10 @@ public class TXManagerImplTest { txMgr.scheduleToRemoveExpiredClientTransaction(txid); - assertTrue(txMgr.isHostedTXStatesEmpty()); + assertThat(txMgr.isHostedTXStatesEmpty()).isTrue(); + + txMgr.close(); + assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull(); } @Test @@ -450,7 +479,7 @@ public class TXManagerImplTest { doReturn(0).when(spyTxMgr).getTransactionTimeToLive(); when(txIds.iterator()).thenAnswer( (Answer<Iterator<TXId>>) invocation -> Arrays.asList(txId1, txId3).iterator()); - assertEquals(2, spyTxMgr.getHostedTXStates().size()); + assertThat(spyTxMgr.getHostedTXStates()).hasSize(2); spyTxMgr.expireDisconnectedClientTransactions(txIds, false); @@ -458,8 +487,8 @@ public class TXManagerImplTest { verify(spyTxMgr, times(1)).removeHostedTXState(eq(txIds)); verify(spyTxMgr, times(1)).removeHostedTXState(eq(txId1)); verify(spyTxMgr, times(1)).removeHostedTXState(eq(txId3)); - assertEquals(tx2, spyTxMgr.getHostedTXStates().get(txId2)); - assertEquals(1, spyTxMgr.getHostedTXStates().size()); + assertThat(spyTxMgr.getHostedTXStates().get(txId2)).isEqualTo(tx2); + assertThat(spyTxMgr.getHostedTXStates()).hasSize(1); } @Test @@ -542,7 +571,10 @@ public class TXManagerImplTest { TXStateProxy tx; tx = txMgr.masqueradeAs(msg); - assertNotNull(tx.getTarget()); + assertThat(tx.getTarget()).isNotNull(); + + txMgr.close(); + assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull(); } @Test @@ -554,6 +586,9 @@ public class TXManagerImplTest { txMgr.removeHostedTXState(txid, true); verify(txStateProxy).setRemovedCausedByFailover(eq(true)); + + txMgr.close(); + assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull(); } @Test @@ -565,6 +600,9 @@ public class TXManagerImplTest { txMgr.removeHostedTXState(txid); verify(txStateProxy, never()).setRemovedCausedByFailover(eq(true)); + + txMgr.close(); + assertThat(TXManagerImpl.getCurrentInstanceForTest()).isNull(); } }