Repository: atlas Updated Branches: refs/heads/master 6e7aa6ed3 -> 479b9ab1c
ATLAS-2667: Enhance GraphTransactionInterceptor to deal with nested/inner commits Change-Id: I9ea29deb9aea226f077f4d008d459fdb3ac6663f Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/479b9ab1 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/479b9ab1 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/479b9ab1 Branch: refs/heads/master Commit: 479b9ab1c0d461e89d14d2d3426af64f3cb9acb9 Parents: 6e7aa6e Author: apoorvnaik <apoorvn...@apache.org> Authored: Tue May 8 15:28:47 2018 -0700 Committer: apoorvnaik <apoorvn...@apache.org> Committed: Thu May 10 07:58:06 2018 -0700 ---------------------------------------------------------------------- .../atlas/GraphTransactionInterceptor.java | 102 +++++++++++++++---- 1 file changed, 83 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/479b9ab1/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java index b3e690f..4c43677 100644 --- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java +++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java @@ -21,14 +21,15 @@ import com.google.common.annotations.VisibleForTesting; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.exception.NotFoundException; +import org.apache.atlas.repository.graphdb.AtlasGraph; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.inject.Inject; import javax.ws.rs.core.Response; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -41,8 +42,10 @@ public class GraphTransactionInterceptor implements MethodInterceptor { private static final Logger LOG = LoggerFactory.getLogger(GraphTransactionInterceptor.class); @VisibleForTesting - private static final ObjectUpdateSynchronizer OBJECT_UPDATE_SYNCHRONIZER = new ObjectUpdateSynchronizer(); - private static final ThreadLocal<List<PostTransactionHook>> postTransactionHooks = new ThreadLocal<>(); + private static final ObjectUpdateSynchronizer OBJECT_UPDATE_SYNCHRONIZER = new ObjectUpdateSynchronizer(); + private static final ThreadLocal<List<PostTransactionHook>> postTransactionHooks = new ThreadLocal<>(); + private static final ThreadLocal<Boolean> isTxnOpen = ThreadLocal.withInitial(() -> Boolean.FALSE); + private static final ThreadLocal<Boolean> innerFailure = ThreadLocal.withInitial(() -> Boolean.FALSE); private final AtlasGraph graph; @@ -53,39 +56,72 @@ public class GraphTransactionInterceptor implements MethodInterceptor { @Override public Object invoke(MethodInvocation invocation) throws Throwable { + Method method = invocation.getMethod(); + String invokingClass = method.getDeclaringClass().getSimpleName(); + String invokedMethodName = method.getName(); + + boolean isInnerTxn = isTxnOpen.get(); + // Outermost txn marks any subsequent transaction as inner + isTxnOpen.set(Boolean.TRUE); + + if (LOG.isDebugEnabled() && isInnerTxn) { + LOG.debug("Txn entry-point {}.{} is inner txn. Commit/Rollback will be ignored", invokingClass, invokedMethodName); + } + boolean isSuccess = false; try { try { Object response = invocation.proceed(); - graph.commit(); - isSuccess = true; - if (LOG.isDebugEnabled()) { - LOG.debug("graph commit"); + if (isInnerTxn) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring commit for nested/inner transaction {}.{}", invokingClass, invokedMethodName); + } + } else { + doCommitOrRollback(invokingClass, invokedMethodName); } + isSuccess = !innerFailure.get(); + return response; } catch (Throwable t) { - if (logException(t)) { - LOG.error("graph rollback due to exception ", t); + if (isInnerTxn) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring rollback for nested/inner transaction {}.{}", invokingClass, invokedMethodName); + } + innerFailure.set(true); } else { - LOG.error("graph rollback due to exception {}:{}", t.getClass().getSimpleName(), t.getMessage()); + doRollback(t); } - graph.rollback(); throw t; } } finally { - List<PostTransactionHook> trxHooks = postTransactionHooks.get(); + // Only outer txn can mark as closed + if (!isInnerTxn) { + if (LOG.isDebugEnabled()) { + LOG.debug("Closing outer txn"); + } + + // Reset the boolean flags + isTxnOpen.set(Boolean.FALSE); + innerFailure.set(Boolean.FALSE); + + List<PostTransactionHook> trxHooks = postTransactionHooks.get(); - if (trxHooks != null) { - postTransactionHooks.remove(); + if (trxHooks != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Processing post-txn hooks"); + } + + postTransactionHooks.remove(); - for (PostTransactionHook trxHook : trxHooks) { - try { - trxHook.onComplete(isSuccess); - } catch (Throwable t) { - LOG.error("postTransactionHook failed", t); + for (PostTransactionHook trxHook : trxHooks) { + try { + trxHook.onComplete(isSuccess); + } catch (Throwable t) { + LOG.error("postTransactionHook failed", t); + } } } } @@ -94,6 +130,34 @@ public class GraphTransactionInterceptor implements MethodInterceptor { } } + private void doCommitOrRollback(final String invokingClass, final String invokedMethodName) { + if (innerFailure.get()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Inner/Nested call threw exception. Rollback on txn entry-point, {}.{}", invokingClass, invokedMethodName); + } + graph.rollback(); + } else { + doCommit(invokingClass, invokedMethodName); + } + } + + private void doCommit(final String invokingClass, final String invokedMethodName) { + graph.commit(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Graph commit txn {}.{}", invokingClass, invokedMethodName); + } + } + + private void doRollback(final Throwable t) { + if (logException(t)) { + LOG.error("graph rollback due to exception ", t); + } else { + LOG.error("graph rollback due to exception {}:{}", t.getClass().getSimpleName(), t.getMessage()); + } + graph.rollback(); + } + public static void lockObjectAndReleasePostCommit(final String guid) { OBJECT_UPDATE_SYNCHRONIZER.lockObject(guid); }