This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch feature/GEODE-5624 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 413b34f3a63af62094925b16f280ae4a641d4dc6 Author: eshu <e...@pivotal.io> AuthorDate: Thu Aug 23 14:24:48 2018 -0700 GEODE-5624: Use a thread to do beforeCompletion and afterCompletion. --- .../ClientServerJTAFailoverDistributedTest.java | 25 ++ .../org/apache/geode/internal/cache/TXState.java | 268 ++++++++++++++++++--- .../cache/TXStateSynchronizationRunnable.java | 144 +++++++++++ 3 files changed, 400 insertions(+), 37 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java index c623766..e4589fa 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java @@ -59,12 +59,14 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { private String hostName; private String uniqueName; private String regionName; + private String replicateRegionName; private VM server1; private VM server2; private VM server3; private VM client1; private int port1; private int port2; + private boolean hasReplicateRegion = false; private final int key = 1; private final String value = "value1"; @@ -92,6 +94,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { hostName = getHostName(); uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName(); regionName = uniqueName + "_region"; + replicateRegionName = uniqueName + "_replicate_region"; } @Test @@ -126,6 +129,9 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { PartitionAttributes partitionAttributes = factory.create(); cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION) .setPartitionAttributes(partitionAttributes).create(regionName); + if (hasReplicateRegion) { + cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE).create(replicateRegionName); + } CacheServer server = cacheRule.getCache().addCacheServer(); server.setPort(0); @@ -148,6 +154,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL); crf.setPoolName(pool.getName()); crf.create(regionName); + if (hasReplicateRegion) crf.create(replicateRegionName); if (ports.length > 1) { pool.acquireConnection(new ServerLocation(hostName, port1)); @@ -171,9 +178,11 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { Object[] results = new Object[2]; InternalClientCache cache = clientCacheRule.getClientCache(); Region region = cache.getRegion(regionName); + Region replicateRegion = hasReplicateRegion? cache.getRegion(replicateRegionName) : null; TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager(); txManager.begin(); region.put(key, newValue); + if (hasReplicateRegion) replicateRegion.put(key, newValue); TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState(); ClientTXStateStub clientTXStateStub = (ClientTXStateStub) txStateProxy.getRealDeal(null, null); @@ -188,6 +197,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { private void doAfterCompletion(TransactionId transactionId, boolean isCommit) { InternalClientCache cache = clientCacheRule.getClientCache(); Region region = cache.getRegion(regionName); + Region replicateRegion = cache.getRegion(replicateRegionName); TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager(); txManager.resume(transactionId); @@ -202,6 +212,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { } if (isCommit) { assertEquals(newValue, region.get(key)); + if (hasReplicateRegion) assertEquals(newValue, replicateRegion.get(key)); } else { assertEquals(value, region.get(key)); } @@ -292,4 +303,18 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { txStateStub.beforeCompletion(); } + @Test + public void jtaCanFailoverToJTAHostForMixedRegionsAfterDoneBeforeCompletion() { + hasReplicateRegion = true; + port2 = server2.invoke(() -> createServerRegion(1, false)); + server2.invoke(() -> doPut(key, value)); + port1 = server1.invoke(() -> createServerRegion(1, true)); + + client1.invoke(() -> createClientRegion(port1, port2)); + Object[] beforeCompletionResults = client1.invoke(() -> doBeforeCompletion()); + + server1.invoke(() -> cacheRule.getCache().close()); + + client1.invoke(() -> doAfterCompletion((TransactionId) beforeCompletionResults[0], true)); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java index 9494fd3..f33267e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.locks.ReentrantLock; import javax.transaction.Status; @@ -51,6 +52,7 @@ import org.apache.geode.cache.UnsupportedOperationInTransactionException; import org.apache.geode.cache.client.internal.ServerRegionDataAccess; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.TXManagerCancelledException; +import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.Assert; import org.apache.geode.internal.cache.control.MemoryThresholds; @@ -101,6 +103,15 @@ public class TXState implements TXStateInterface { // Access this variable should be in synchronized block. private boolean beforeCompletionCalled; + /** + * for client/server JTA transactions we need to have a single thread handle both beforeCompletion + * and afterCompletion so that beforeCompletion can obtain locks for the afterCompletion step. + * This is that thread + */ + protected volatile TXStateSynchronizationRunnable syncRunnable; + private volatile SynchronizationCommitConflictException beforeCompletionException; + private volatile RuntimeException afterCompletionException; + // Internal testing hooks private Runnable internalAfterReservation; protected Runnable internalAfterConflictCheck; @@ -910,6 +921,9 @@ public class TXState implements TXStateInterface { synchronized (this.completionGuard) { this.completionGuard.notifyAll(); } + if (this.syncRunnable != null) { + this.syncRunnable.abort(); + } if (iae != null && !this.proxy.getCache().isClosed()) { throw iae; } @@ -996,6 +1010,75 @@ public class TXState implements TXStateInterface { } } +// ////////////////////////////////////////////////////////////////// +// // JTA Synchronization implementation // +// ////////////////////////////////////////////////////////////////// +// /* +// * (non-Javadoc) +// * +// * @see org.apache.geode.internal.cache.TXStateInterface#beforeCompletion() +// */ +// @Override +// public synchronized void beforeCompletion() throws SynchronizationCommitConflictException { +// if (this.closed) { +// throw new TXManagerCancelledException(); +// } +// if (beforeCompletionCalled) { +// // do not re-execute beforeCompletion again +// return; +// } +// beforeCompletionCalled = true; +// doBeforeCompletion(); +// } +// +// private void doBeforeCompletion() { +// final long opStart = CachePerfStats.getStatTime(); +// this.jtaLifeTime = opStart - getBeginTime(); +// try { +// reserveAndCheck(); +// /* +// * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort +// * the transaction. +// */ +// TransactionWriter writer = this.proxy.getTxMgr().getWriter(); +// if (writer != null) { +// try { +// // need to mark this so we don't fire again in commit +// firedWriter = true; +// TXEvent event = getEvent(); +// if (!event.hasOnlyInternalEvents()) { +// writer.beforeCommit(event); +// } +// } catch (TransactionWriterException twe) { +// throw new CommitConflictException(twe); +// } catch (VirtualMachineError err) { +// // cleanup(); this allocates objects so I don't think we can do it - that leaves the TX +// // open, but we are poison pilling so we should be ok?? +// +// SystemFailure.initiateFailure(err); +// // If this ever returns, rethrow the error. We're poisoned +// // now, so don't let this thread continue. +// throw err; +// } catch (Throwable t) { +// // Whenever you catch Error or Throwable, you must also +// // catch VirtualMachineError (see above). However, there is +// // _still_ a possibility that you are dealing with a cascading +// // error condition, so you also need to check to see if the JVM +// // is still usable: +// SystemFailure.checkFailure(); +// throw new CommitConflictException(t); +// } +// } +// } catch (CommitConflictException commitConflict) { +// cleanup(); +// proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this); +// throw new SynchronizationCommitConflictException( +// LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0 +// .toLocalizedString(getTransactionId()), +// commitConflict); +// } +// } + ////////////////////////////////////////////////////////////////// // JTA Synchronization implementation // ////////////////////////////////////////////////////////////////// @@ -1009,17 +1092,57 @@ public class TXState implements TXStateInterface { if (this.closed) { throw new TXManagerCancelledException(); } + if (beforeCompletionCalled) { // do not re-execute beforeCompletion again return; } beforeCompletionCalled = true; - doBeforeCompletion(); + + TXStateSynchronizationRunnable sync = createTxStateSynchronizationRunnable(); + setSynchronizationRunnable(sync); + + Executor exec = getExecutor(); + exec.execute(sync); + sync.waitForFirstExecution(); + if (getBeforeCompletionException() != null) { + throw getBeforeCompletionException(); + } + //doBeforeCompletion(); } + TXStateSynchronizationRunnable createTxStateSynchronizationRunnable() { + Runnable beforeCompletion = new Runnable() { + @SuppressWarnings("synthetic-access") + public void run() { + doBeforeCompletion(); + } + }; + + return new TXStateSynchronizationRunnable(getCache().getCancelCriterion(), + beforeCompletion); + } + + Executor getExecutor() { + return InternalDistributedSystem.getConnectedInstance().getDistributionManager() + .getWaitingThreadPool(); + } + + SynchronizationCommitConflictException getBeforeCompletionException() { + return beforeCompletionException; + } + + private void setSynchronizationRunnable(TXStateSynchronizationRunnable synchronizationRunnable) { + syncRunnable = synchronizationRunnable; + } + + private void doBeforeCompletion() { + proxy.getTxMgr().setTXState(null); final long opStart = CachePerfStats.getStatTime(); this.jtaLifeTime = opStart - getBeginTime(); + + try { reserveAndCheck(); /* @@ -1066,17 +1189,28 @@ public class TXState implements TXStateInterface { } /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int) - */ + * (non-Javadoc) + * + * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int) + */ @Override public synchronized void afterCompletion(int status) { - this.proxy.getTxMgr().setTXState(null); - // For commit, beforeCompletion should be called. Otherwise + proxy.getTxMgr().setTXState(null); + Runnable afterCompletion = new Runnable() { + @SuppressWarnings("synthetic-access") + public void run() { + doAfterCompletion(status); + } + }; + // if there was a beforeCompletion call then there will be a thread + // sitting in the waiting pool to execute afterCompletion. Otherwise // throw FailedSynchronizationException(). - if (wasBeforeCompletionCalled()) { - doAfterCompletion(status); + TXStateSynchronizationRunnable sync = getSynchronizationRunnable(); + if (sync != null) { + sync.runSecondRunnable(afterCompletion); + if (getAfterCompletionException() != null) { + throw getAfterCompletionException(); + } } else { // rollback does not run beforeCompletion. if (status != Status.STATUS_ROLLEDBACK) { @@ -1087,37 +1221,97 @@ public class TXState implements TXStateInterface { } } + TXStateSynchronizationRunnable getSynchronizationRunnable() { + return this.syncRunnable; + } + + RuntimeException getAfterCompletionException() { + return afterCompletionException; + } + private void doAfterCompletion(int status) { final long opStart = CachePerfStats.getStatTime(); - try { - switch (status) { - case Status.STATUS_COMMITTED: - Assert.assertTrue(this.locks != null, - "Gemfire Transaction afterCompletion called with illegal state."); - try { - commit(); - saveTXCommitMessageForClientFailover(); - } catch (CommitConflictException error) { - Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId() - + " afterCompletion failed.due to CommitConflictException: " + error); - } - - this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this); - this.locks = null; - break; - case Status.STATUS_ROLLEDBACK: - this.jtaLifeTime = opStart - getBeginTime(); - rollback(); + switch (status) { + case Status.STATUS_COMMITTED: + Assert.assertTrue(this.locks != null, + "Gemfire Transaction afterCompletion called with illegal state."); + try { + proxy.getTxMgr().setTXState(null); + commit(); saveTXCommitMessageForClientFailover(); - this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this); - break; - default: - Assert.assertTrue(false, "Unknown JTA Synchronization status " + status); - } - } catch (InternalGemFireError error) { - throw new TransactionException(error); - } - } + } catch (CommitConflictException error) { + Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId() + + " afterCompletion failed.due to CommitConflictException: " + error); + } + + this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this); + this.locks = null; + break; + case Status.STATUS_ROLLEDBACK: + this.jtaLifeTime = opStart - getBeginTime(); + this.proxy.getTxMgr().setTXState(null); + rollback(); + saveTXCommitMessageForClientFailover(); + this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this); + break; + default: + Assert.assertTrue(false, "Unknown JTA Synchronization status " + status); + } + } + +// /* +// * (non-Javadoc) +// * +// * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int) +// */ +// @Override +// public synchronized void afterCompletion(int status) { +// this.proxy.getTxMgr().setTXState(null); +// // For commit, beforeCompletion should be called. Otherwise +// // throw FailedSynchronizationException(). +// if (wasBeforeCompletionCalled()) { +// doAfterCompletion(status); +// } else { +// // rollback does not run beforeCompletion. +// if (status != Status.STATUS_ROLLEDBACK) { +// throw new FailedSynchronizationException( +// "Could not execute afterCompletion when beforeCompletion was not executed"); +// } +// doAfterCompletion(status); +// } +// } +// +// private void doAfterCompletion(int status) { +// final long opStart = CachePerfStats.getStatTime(); +// try { +// switch (status) { +// case Status.STATUS_COMMITTED: +// Assert.assertTrue(this.locks != null, +// "Gemfire Transaction afterCompletion called with illegal state."); +// try { +// commit(); +// saveTXCommitMessageForClientFailover(); +// } catch (CommitConflictException error) { +// Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId() +// + " afterCompletion failed.due to CommitConflictException: " + error); +// } +// +// this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this); +// this.locks = null; +// break; +// case Status.STATUS_ROLLEDBACK: +// this.jtaLifeTime = opStart - getBeginTime(); +// rollback(); +// saveTXCommitMessageForClientFailover(); +// this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this); +// break; +// default: +// Assert.assertTrue(false, "Unknown JTA Synchronization status " + status); +// } +// } catch (InternalGemFireError error) { +// throw new TransactionException(error); +// } +// } boolean wasBeforeCompletionCalled() { return beforeCompletionCalled; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java new file mode 100644 index 0000000..28f367b --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.internal.cache.tier.sockets.CommBufferPool; +import org.apache.geode.internal.logging.LogService; + +/** + * TXStateSynchronizationThread manages beforeCompletion and afterCompletion calls. + * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior. + * Then you must invoke runSecondRunnable() with another Runnable that invokes afterCompletion + * behavior. + * + * @since Geode 1.6.0 + */ +public class TXStateSynchronizationRunnable implements Runnable { + private static final Logger logger = LogService.getLogger(); + + private final CancelCriterion cancelCriterion; + + private Runnable firstRunnable; + private final Object firstRunnableSync = new Object(); + private boolean firstRunnableCompleted; + + private Runnable secondRunnable; + private final Object secondRunnableSync = new Object(); + private boolean secondRunnableCompleted; + + private boolean abort; + + public TXStateSynchronizationRunnable(final CancelCriterion cancelCriterion, final Runnable beforeCompletion) { + this.cancelCriterion = cancelCriterion; + this.firstRunnable = beforeCompletion; + } + + @Override + public void run() { + doSynchronizationOps(); + } + + private void doSynchronizationOps() { + synchronized (this.firstRunnableSync) { + try { + this.firstRunnable.run(); + } finally { + if (logger.isTraceEnabled()) { + logger.trace("beforeCompletion notification completed"); + } + this.firstRunnableCompleted = true; + this.firstRunnable = null; + this.firstRunnableSync.notifyAll(); + } + } + synchronized (this.secondRunnableSync) { + // TODO there should be a transaction timeout that keeps this thread + // from sitting around forever if the client goes away + final boolean isTraceEnabled = logger.isTraceEnabled(); + while (this.secondRunnable == null && !this.abort) { + try { + if (isTraceEnabled) { + logger.trace("waiting for afterCompletion notification"); + } + this.secondRunnableSync.wait(1000); + } catch (InterruptedException ignore) { + // eat the interrupt and check for exit conditions + } + } + if (isTraceEnabled) { + logger.trace("executing afterCompletion notification"); + } + try { + if (!this.abort) { + this.secondRunnable.run(); + } + } finally { + if (isTraceEnabled) { + logger.trace("afterCompletion notification completed"); + } + this.secondRunnableCompleted = true; + this.secondRunnable = null; + this.secondRunnableSync.notifyAll(); + } + } + } + + /** + * wait for the initial beforeCompletion step to finish + */ + public void waitForFirstExecution() { + synchronized (this.firstRunnableSync) { + while (!this.firstRunnableCompleted) { + try { + this.firstRunnableSync.wait(1000); + } catch (InterruptedException ignore) { + // eat the interrupt and check for exit conditions + } + cancelCriterion.checkCancelInProgress(null); + } + } + } + + /** + * run the afterCompletion portion of synchronization. This method schedules execution of the + * given runnable and then waits for it to finish running + */ + public void runSecondRunnable(Runnable r) { + synchronized (this.secondRunnableSync) { + this.secondRunnable = r; + this.secondRunnableSync.notifyAll(); + while (!this.secondRunnableCompleted && !this.abort) { + try { + this.secondRunnableSync.wait(1000); + } catch (InterruptedException ignore) { + // eat the interrupt and check for exit conditions + } + cancelCriterion.checkCancelInProgress(null); + } + } + } + + /** + * stop waiting for an afterCompletion to arrive and just exit + */ + public void abort() { + synchronized (this.secondRunnableSync) { + this.abort = true; + } + } +}