http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --cc modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index 722b749,2f7e6c0..afecd17 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@@ -44,19 -43,10 +43,23 @@@ import org.apache.ignite.internal.GridD import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.IgniteCodeGeneratingFail; +import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest; +import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishResponse; +import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; +import org.apache.ignite.internal.processors.trace.EventsTrace; + import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue; + import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest; + import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse; + import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 18a3729,4cb68da..ae369cf --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@@ -61,9 -66,9 +66,10 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor; import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor; + import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; import org.apache.ignite.internal.processors.task.GridTaskProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; +import org.apache.ignite.internal.processors.trace.TraceProcessor; import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions; import org.apache.ignite.internal.util.IgniteExceptionRegistry; import org.apache.ignite.internal.util.StripedExecutor; @@@ -420,21 -439,16 +440,23 @@@ public interface GridKernalContext exte public DataStructuresProcessor dataStructures(); /** + * Gets trace processor. + * + * @return Trace processor. + */ + public TraceProcessor trace(); + + /** - * Sets segmented flag to {@code true} when node is stopped due to segmentation issues. + * Checks whether this node is invalid due to a critical error or not. + * + * @return {@code True} if this node is invalid, {@code false} otherwise. */ - public void markSegmented(); + public boolean invalid(); /** - * Gets segmented flag. + * Checks whether this node detected its segmentation from the rest of the grid. * - * @return {@code True} if network is currently segmented, {@code false} otherwise. + * @return {@code True} if this node has segmented, {@code false} otherwise. */ public boolean segmented(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 58522b8,a0e3f93..33d1d4c --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@@ -78,9 -85,9 +85,10 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor; import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor; + import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; import org.apache.ignite.internal.processors.task.GridTaskProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; +import org.apache.ignite.internal.processors.trace.TraceProcessor; import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions; import org.apache.ignite.internal.util.IgniteExceptionRegistry; import org.apache.ignite.internal.util.StripedExecutor; @@@ -283,12 -291,16 +292,20 @@@ public class GridKernalContextImpl impl @GridToStringExclude private DataStructuresProcessor dataStructuresProc; + /** Cache mvcc coordinators. */ + @GridToStringExclude + private MvccProcessor coordProc; + + /** */ + @GridToStringExclude + private IgniteAuthenticationProcessor authProc; + /** */ @GridToStringExclude + private TraceProcessor traceProc; + + /** */ + @GridToStringExclude private List<GridComponent> comps = new LinkedList<>(); /** */ @@@ -581,11 -618,17 +623,19 @@@ else if (comp instanceof PlatformProcessor) platformProc = (PlatformProcessor)comp; else if (comp instanceof PoolProcessor) - poolProc = (PoolProcessor) comp; + poolProc = (PoolProcessor)comp; else if (comp instanceof GridMarshallerMappingProcessor) mappingProc = (GridMarshallerMappingProcessor)comp; + else if (comp instanceof MvccProcessor) + coordProc = (MvccProcessor)comp; + else if (comp instanceof PdsFoldersResolver) + pdsFolderRslvr = (PdsFoldersResolver)comp; + else if (comp instanceof GridInternalSubscriptionProcessor) + internalSubscriptionProc = (GridInternalSubscriptionProcessor)comp; + else if (comp instanceof IgniteAuthenticationProcessor) + authProc = (IgniteAuthenticationProcessor)comp; + else if (comp instanceof TraceProcessor) + traceProc = (TraceProcessor)comp; else if (!(comp instanceof DiscoveryNodeValidationProcessor || comp instanceof PlatformPluginProcessor)) assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass(); @@@ -851,13 -903,8 +910,13 @@@ } /** {@inheritDoc} */ - @Override public IgniteLogger log(Class<?> cls) { - return log(cls.getName()); ++ @Override public TraceProcessor trace() { ++ return traceProc; + } + + /** {@inheritDoc} */ - @Override public void markSegmented() { - segFlag = true; + @Override public IgniteLogger log(String ctgr) { + return config().getGridLogger().getLogger(ctgr); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 15ddb77,6b1c995..15e0868 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@@ -145,9 -163,9 +163,10 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor; import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor; + import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; import org.apache.ignite.internal.processors.task.GridTaskProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; +import org.apache.ignite.internal.processors.trace.TraceProcessor; import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions; import org.apache.ignite.internal.suggestions.JvmConfigurationSuggestions; import org.apache.ignite.internal.suggestions.OsConfigurationSuggestions; http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java index 70c5eca,fe61aec..10eddec --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java @@@ -23,7 -23,7 +23,8 @@@ import java.nio.ByteBuffer import org.apache.ignite.internal.ExecutorAwareMessage; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheMessage; + import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; +import org.apache.ignite.internal.processors.trace.IgniteTraceAware; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 2433705,54efb47..a573506 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@@ -141,9 -176,9 +176,10 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; + import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage; import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultRequest; import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultResponse; +import org.apache.ignite.internal.processors.trace.EventsTrace; import org.apache.ignite.internal.util.GridByteArrayList; import org.apache.ignite.internal.util.GridIntList; import org.apache.ignite.internal.util.GridLongList; http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index c8d5546,9283939..2d084a7 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@@ -32,12 -33,16 +33,16 @@@ import org.apache.ignite.internal.Inval import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; -import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; + import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; + import org.apache.ignite.internal.processors.cache.mvcc.MvccFuture; + import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.trace.EventsTrace; + import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@@ -354,7 -371,8 +377,9 @@@ public final class GridDhtTxFinishFutur tx.activeCachesDeploymentEnabled(), false, false, + tx.mvccSnapshot(), - tx.filterUpdateCountersForBackupNode(n)); ++ tx.filterUpdateCountersForBackupNode(n), + tx.nodeTrace() != null ? new EventsTrace() : null); try { cctx.io().send(n, req, tx.ioPolicy()); @@@ -458,7 -484,8 +491,9 @@@ updCntrs, false, false, + mvccSnapshot, - commit ? null : tx.filterUpdateCountersForBackupNode(n)); ++ commit ? null : tx.filterUpdateCountersForBackupNode(n), + tx.nodeTrace() != null ? new EventsTrace() : null); req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion()); @@@ -528,7 -555,8 +563,9 @@@ tx.activeCachesDeploymentEnabled(), false, false, + mvccSnapshot, - null); ++ null, + tx.nodeTrace() != null ? new EventsTrace() : null); req.writeVersion(tx.writeVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index 9bee5dd,61896b5..96cf37c --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@@ -25,8 -25,8 +25,9 @@@ import org.apache.ignite.cache.CacheWri import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest; + import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.trace.EventsTrace; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@@ -123,7 -135,8 +136,9 @@@ public class GridDhtTxFinishRequest ext boolean addDepInfo, boolean retVal, boolean waitRemoteTxs, + MvccSnapshot mvccSnapshot, - Collection<PartitionUpdateCountersMessage> updCntrs ++ Collection<PartitionUpdateCountersMessage> updCntrs, + EventsTrace eventsTrace ) { super( xidVer, @@@ -212,7 -228,8 +233,9 @@@ Collection<Long> updateIdxs, boolean retVal, boolean waitRemoteTxs, + MvccSnapshot mvccSnapshot, - Collection<PartitionUpdateCountersMessage> updCntrs ++ Collection<PartitionUpdateCountersMessage> updCntrs, + EventsTrace eventsTrace ) { this(nearNodeId, futId, @@@ -238,7 -255,8 +261,9 @@@ addDepInfo, retVal, waitRemoteTxs, + mvccSnapshot, - updCntrs); ++ updCntrs, + eventsTrace); if (updateIdxs != null && !updateIdxs.isEmpty()) { partUpdateCnt = new GridLongList(updateIdxs.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index b7f264f,a091d44..751ec3f --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@@ -37,9 -38,7 +38,8 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; - import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.trace.EventsTrace; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 0d80adf,ffa383b..fa301d7 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@@ -40,9 -45,8 +45,10 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; + import org.apache.ignite.internal.processors.cache.transactions.TxCounters; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.trace.EventsTrace; +import org.apache.ignite.internal.processors.trace.IgniteTraceAware; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.GridLeanSet; @@@ -98,9 -109,11 +111,14 @@@ public abstract class GridDhtTxLocalAda /** Nodes where transactions were started on lock step. */ private Set<ClusterNode> lockTxNodes; + /** Enlist or lock future what is currently in progress. */ + @SuppressWarnings("UnusedDeclaration") + @GridToStringExclude + protected volatile IgniteInternalFuture<?> lockFut; + + /** */ + protected EventsTrace eventsTrace; + /** * Empty constructor required for {@link Externalizable}. */ @@@ -846,29 -857,72 +871,95 @@@ } /** + * @return Lock future. + */ + public IgniteInternalFuture<?> lockFuture() { + return lockFut; + } + + /** + * Atomically updates lock future. + * + * @param oldFut Old future. + * @param newFut New future. + * @return {@code true} If future was changed. + */ + public boolean updateLockFuture(IgniteInternalFuture<?> oldFut, IgniteInternalFuture<?> newFut) { + return LOCK_FUT_UPD.compareAndSet(this, oldFut, newFut); + } + + /** + * Clears lock future. + * + * @param cond Clear lock condition. + */ + public void clearLockFuture(@Nullable IgniteInternalFuture cond) { + while (true) { + IgniteInternalFuture f = lockFut; + + if (f == null + || f == ROLLBACK_FUT + || (cond != null && f != cond) + || updateLockFuture(f, null)) + return; + } + } + + /** + * @param f Future to finish. + * @param err Error. + * @param clearLockFut {@code True} if need to clear lock future. + * @return Finished future. + */ + public <T> GridFutureAdapter<T> finishFuture(GridFutureAdapter<T> f, Throwable err, boolean clearLockFut) { + if (clearLockFut) + clearLockFuture(null); + + f.onDone(err); + + return f; + } + + /** + * Prepare async rollback. + * + * @return Current lock future or null if it's safe to roll back. + */ + @Nullable public IgniteInternalFuture<?> tryRollbackAsync() { + while (true) { + final IgniteInternalFuture fut = lockFut; + + if (fut == ROLLBACK_FUT) + return null; + else if (updateLockFuture(fut, ROLLBACK_FUT)) + return fut; + } + } + + /** + * @return Node trace. + */ + public EventsTrace nodeTrace() { + return eventsTrace; + } + + /** + * @param eventsTrace Node trace. + */ + public void nodeTrace(EventsTrace eventsTrace) { + this.eventsTrace = eventsTrace; + } + + /** + * @param rmtNodeId Remote node ID. + * @param eventsTrace Node trace to collect. + */ + public void collectNodeTrace(UUID rmtNodeId, EventsTrace eventsTrace) { + if (this.eventsTrace != null && eventsTrace != null) + this.eventsTrace.addRemoteTrace(rmtNodeId, eventsTrace); + } + + /** * @param prepFut Prepare future. * @return If transaction if finished on prepare step returns future which is completed after transaction finish. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 4be7c0d,0edf63f..185093d --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@@ -859,9 -894,10 +897,11 @@@ public final class GridDhtTxPrepareFutu prepErr, null, tx.onePhaseCommit(), - tx.activeCachesDeploymentEnabled()); + tx.activeCachesDeploymentEnabled(), + tx.nodeTrace()); + res.mvccSnapshot(tx.mvccSnapshot()); + if (prepErr == null) { if (tx.needReturnValue() || tx.nearOnOriginatingNode() || tx.hasInterceptor()) addDhtValues(res); @@@ -1236,235 -1296,269 +1300,271 @@@ return; if (last) { - if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) { - for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) { - if (!tx.dhtMap().containsKey(nearMapping.primary().id())) { - tx.onePhaseCommit(false); + if (waitCrdCntrFut != null) { + skipInit = true; - break; - } - } - } - - int miniId = 0; - - assert tx.transactionNodes() != null; - - final long timeout = timeoutObj != null ? timeoutObj.timeout : 0; + waitCrdCntrFut.listen(new IgniteInClosure<IgniteInternalFuture<MvccSnapshot>>() { + @Override public void apply(IgniteInternalFuture<MvccSnapshot> fut) { + try { + fut.get(); - // Create mini futures. - for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) { - assert !dhtMapping.empty(); + sendPrepareRequests(); - ClusterNode n = dhtMapping.primary(); + markInitialized(); + } + catch (Throwable e) { + U.error(log, "Failed to get mvcc version for tx [txId=" + tx.nearXidVersion() + + ", err=" + e + ']', e); - assert !n.isLocal(); + GridNearTxPrepareResponse res = createPrepareResponse(e); - GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id()); + onDone(res, res.error()); + } + } + }); + } + else + sendPrepareRequests(); + } + } + finally { + if (!skipInit) + markInitialized(); + } + } - Collection<IgniteTxEntry> nearWrites = nearMapping == null ? null : nearMapping.writes(); + /** + * + */ + private void sendPrepareRequests() { + if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) { + for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) { + if (!tx.dhtMap().containsKey(nearMapping.primary().id())) { + tx.onePhaseCommit(false); - Collection<IgniteTxEntry> dhtWrites = dhtMapping.writes(); + break; + } + } + } - if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites)) - continue; + assert !tx.txState().mvccEnabled(cctx) || !tx.onePhaseCommit() || tx.mvccSnapshot() != null; - if (tx.remainingTime() == -1) - return; + int miniId = 0; - MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping); + assert tx.transactionNodes() != null; - add(fut); // Append new future. + final long timeout = timeoutObj != null ? timeoutObj.timeout : 0; - assert txNodes != null; + // Do not need process active transactions on backups. + MvccSnapshot mvccSnapshot = tx.mvccSnapshot(); - GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( - futId, - fut.futureId(), - tx.topologyVersion(), - tx, - timeout, - dhtWrites, - nearWrites, - txNodes, - tx.nearXidVersion(), - true, - tx.onePhaseCommit(), - tx.subjectId(), - tx.taskNameHash(), - tx.activeCachesDeploymentEnabled(), - tx.storeWriteThrough(), - retVal, - tx.nodeTrace() != null ? new EventsTrace() : null); + if (mvccSnapshot != null) + mvccSnapshot = mvccSnapshot.withoutActiveTransactions(); - int idx = 0; + // Create mini futures. + for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) { + assert !dhtMapping.empty() || dhtMapping.queryUpdate(); - for (IgniteTxEntry entry : dhtWrites) { - try { - GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached(); + ClusterNode n = dhtMapping.primary(); - GridCacheContext<?, ?> cacheCtx = cached.context(); + assert !n.isLocal(); - // Do not invalidate near entry on originating transaction node. - req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) && - cached.readerId(n.id()) != null); + GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id()); - if (cached.isNewLocked()) { - List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(), - tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion()); + Collection<IgniteTxEntry> nearWrites = nearMapping == null ? null : nearMapping.writes(); - // Do not preload if local node is a partition owner. - if (!owners.contains(cctx.localNode())) - req.markKeyForPreload(idx); - } + Collection<IgniteTxEntry> dhtWrites = dhtMapping.writes(); - break; - } - catch (GridCacheEntryRemovedException ignore) { - assert false : "Got removed exception on entry with dht local candidate: " + entry; - } + if (!dhtMapping.queryUpdate() && F.isEmpty(dhtWrites) && F.isEmpty(nearWrites)) + continue; - idx++; - } + MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping); + + add(fut); // Append new future. + + assert req.transactionNodes() != null; + + GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( + futId, + fut.futureId(), + tx.topologyVersion(), + tx, + timeout, + dhtWrites, + nearWrites, + this.req.transactionNodes(), + tx.nearXidVersion(), + true, + tx.onePhaseCommit(), + tx.subjectId(), + tx.taskNameHash(), + tx.activeCachesDeploymentEnabled(), + tx.storeWriteThrough(), + retVal, + mvccSnapshot, - tx.filterUpdateCountersForBackupNode(n)); ++ tx.filterUpdateCountersForBackupNode(n), ++ tx.nodeTrace() != null ? new EventsTrace() : null); + + req.queryUpdate(dhtMapping.queryUpdate()); + + int idx = 0; + + for (IgniteTxEntry entry : dhtWrites) { + try { + GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached(); - if (!F.isEmpty(nearWrites)) { - for (IgniteTxEntry entry : nearWrites) { - try { - if (entry.explicitVersion() == null) { - GridCacheMvccCandidate added = entry.cached().candidate(version()); + GridCacheContext<?, ?> cacheCtx = cached.context(); - assert added != null : "Missing candidate for cache entry:" + entry; - assert added.dhtLocal(); + // Do not invalidate near entry on originating transaction node. + req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) && + cached.readerId(n.id()) != null); - if (added.ownerVersion() != null) - req.owned(entry.txKey(), added.ownerVersion()); - } + if (cached.isNewLocked()) { + List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(), + tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion()); - break; - } - catch (GridCacheEntryRemovedException ignore) { - assert false : "Got removed exception on entry with dht local candidate: " + entry; - } - } + // Do not preload if local node is a partition owner. + if (!owners.contains(cctx.localNode())) + req.markKeyForPreload(idx); } - assert req.transactionNodes() != null; + break; + } + catch (GridCacheEntryRemovedException ignore) { + assert false : "Got removed exception on entry with dht local candidate: " + entry; + } + idx++; + } + + if (!F.isEmpty(nearWrites)) { + for (IgniteTxEntry entry : nearWrites) { try { - cctx.io().send(n, req, tx.ioPolicy()); + if (entry.explicitVersion() == null) { + GridCacheMvccCandidate added = entry.cached().candidate(version()); - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + n.id() + ']'); - } - } - catch (ClusterTopologyCheckedException ignored) { - fut.onNodeLeft(); - } - catch (IgniteCheckedException e) { - if (!cctx.kernalContext().isStopping()) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + n.id() + ']'); - } + assert added != null : "Missing candidate for cache entry:" + entry; + assert added.dhtLocal(); - fut.onResult(e); - } - else { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, failed to send request dht, ignore [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + n.id() + - ", err=" + e + ']'); - } + if (added.ownerVersion() != null) + req.owned(entry.txKey(), added.ownerVersion()); } + + break; + } + catch (GridCacheEntryRemovedException ignore) { + assert false : "Got removed exception on entry with dht local candidate: " + entry; } } + } - for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) { - if (!tx.dhtMap().containsKey(nearMapping.primary().id())) { - if (tx.remainingTime() == -1) - return; + assert req.transactionNodes() != null; - MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping); + try { + cctx.io().send(n, req, tx.ioPolicy()); - add(fut); // Append new future. + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + ']'); + } + } + catch (ClusterTopologyCheckedException ignored) { + fut.onNodeLeft(); + } + catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + ']'); + } - GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( - futId, - fut.futureId(), - tx.topologyVersion(), - tx, - timeout, - null, - nearMapping.writes(), - tx.transactionNodes(), - tx.nearXidVersion(), - true, - tx.onePhaseCommit(), - tx.subjectId(), - tx.taskNameHash(), - tx.activeCachesDeploymentEnabled(), - tx.storeWriteThrough(), - retVal, - tx.nodeTrace() != null ? new EventsTrace() : null); - - for (IgniteTxEntry entry : nearMapping.entries()) { - if (CU.writes().apply(entry)) { - try { - if (entry.explicitVersion() == null) { - GridCacheMvccCandidate added = entry.cached().candidate(version()); + fut.onResult(e); + } + } - assert added != null : "Null candidate for non-group-lock entry " + - "[added=" + added + ", entry=" + entry + ']'; - assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" + - "[added=" + added + ", entry=" + entry + ']'; + for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) { + if (!tx.dhtMap().containsKey(nearMapping.primary().id())) { + if (tx.remainingTime() == -1) + return; - if (added != null && added.ownerVersion() != null) - req.owned(entry.txKey(), added.ownerVersion()); - } + MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping); + + add(fut); // Append new future. + + GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( + futId, + fut.futureId(), + tx.topologyVersion(), + tx, + timeout, + null, + nearMapping.writes(), + tx.transactionNodes(), + tx.nearXidVersion(), + true, + tx.onePhaseCommit(), + tx.subjectId(), + tx.taskNameHash(), + tx.activeCachesDeploymentEnabled(), + tx.storeWriteThrough(), + retVal, + mvccSnapshot, - null); ++ null, ++ tx.nodeTrace() != null ? new EventsTrace() : null); + + for (IgniteTxEntry entry : nearMapping.entries()) { + if (CU.writes().apply(entry)) { + try { + if (entry.explicitVersion() == null) { + GridCacheMvccCandidate added = entry.cached().candidate(version()); - break; - } catch (GridCacheEntryRemovedException ignore) { - assert false : "Got removed exception on entry with dht local candidate: " + entry; - } + assert added != null : "Null candidate for non-group-lock entry " + + "[added=" + added + ", entry=" + entry + ']'; + assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" + + "[added=" + added + ", entry=" + entry + ']'; + + if (added != null && added.ownerVersion() != null) + req.owned(entry.txKey(), added.ownerVersion()); } + + break; + } catch (GridCacheEntryRemovedException ignore) { + assert false : "Got removed exception on entry with dht local candidate: " + entry; } + } + } - assert req.transactionNodes() != null; + assert req.transactionNodes() != null; - try { - cctx.io().send(nearMapping.primary(), req, tx.ioPolicy()); + try { + cctx.io().send(nearMapping.primary(), req, tx.ioPolicy()); - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + nearMapping.primary().id() + ']'); - } - } - catch (ClusterTopologyCheckedException ignored) { - fut.onNodeLeft(); + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.primary().id() + ']'); + } + } + catch (ClusterTopologyCheckedException ignored) { + fut.onNodeLeft(); + } + catch (IgniteCheckedException e) { + if (!cctx.kernalContext().isStopping()) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.primary().id() + ']'); } - catch (IgniteCheckedException e) { - if (!cctx.kernalContext().isStopping()) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + nearMapping.primary().id() + ']'); - } - fut.onResult(e); - } - else { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + nearMapping.primary().id() + - ", err=" + e + ']'); - } - } + fut.onResult(e); + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.primary().id() + + ", err=" + e + ']'); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index 1de6cb1,30e8ceb..e8d6bbd --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@@ -143,8 -155,8 +156,10 @@@ public class GridDhtTxPrepareRequest ex boolean addDepInfo, boolean storeWriteThrough, boolean retVal, + MvccSnapshot mvccSnapshot, - Collection<PartitionUpdateCountersMessage> counters) { ++ Collection<PartitionUpdateCountersMessage> counters, + EventsTrace eventsTrace + ) { super(tx, timeout, null, @@@ -174,9 -187,7 +191,12 @@@ nearNodeId = tx.nearNodeId(); + skipCompletedVers = tx.xidVersion() == tx.nearXidVersion(); ++ ++ + this.eventsTrace = eventsTrace; + + recordTracePoint(TracePoint.DHT_PREPARE_REQUEST_CREATED); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 87499a8,140c1d5..3f61b1a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@@ -40,10 -42,7 +42,8 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.trace.EventsTrace; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; - import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; - import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@@ -556,8 -583,8 +586,9 @@@ public class GridNearOptimisticSerializ tx.subjectId(), tx.taskNameHash(), m.clientFirst(), + txNodes.size() == 1, - tx.activeCachesDeploymentEnabled()); + tx.activeCachesDeploymentEnabled(), + cctx.kernalContext().trace().tracingEnabled() ? new EventsTrace() : null); for (IgniteTxEntry txEntry : writes) { if (txEntry.op() == TRANSFORM) http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index bd7f2a2,06d7a8c..42ec5ca --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@@ -44,7 -48,7 +48,8 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock; + import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.trace.EventsTrace; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; @@@ -538,8 -573,8 +576,9 @@@ public class GridNearOptimisticTxPrepar tx.subjectId(), tx.taskNameHash(), m.clientFirst(), + true, - tx.activeCachesDeploymentEnabled()); + tx.activeCachesDeploymentEnabled(), - cctx.kernalContext().trace().tracingEnabled() ? new EventsTrace() : null); ++ cctx.kernalContext().trace().tracingEnabled() ? new EventsTrace() : null)); for (IgniteTxEntry txEntry : m.entries()) { if (txEntry.op() == TRANSFORM) http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 45ff1df,85a48a3..37fd8de --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@@ -33,12 -34,13 +34,15 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; - import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; + import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; + import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; + import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; + import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotResponseListener; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; +import org.apache.ignite.internal.processors.trace.EventsTrace; +import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.C1; @@@ -213,8 -228,10 +232,11 @@@ public class GridNearPessimisticTxPrepa tx.subjectId(), tx.taskNameHash(), false, + true, - tx.activeCachesDeploymentEnabled()); + tx.activeCachesDeploymentEnabled(), - cctx.kernalContext().trace().tracingEnabled() ? new EventsTrace() : null); ++ cctx.kernalContext().trace().tracingEnabled() ? new EventsTrace() : null)); + + req.queryUpdate(m.queryUpdate()); for (IgniteTxEntry txEntry : writes) { if (txEntry.op() == TRANSFORM) http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index aeecf87,4a4d8e3..fd8ca5a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@@ -44,10 -49,9 +49,11 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.trace.EventsTrace; +import org.apache.ignite.internal.processors.trace.IgniteTraceAware; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; + import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; @@@ -392,15 -401,12 +413,14 @@@ public final class GridNearTxFinishFutu fut.getClass() == CheckRemoteTxMiniFuture.class; } - /** - * Initializes future. - * - * @param commit Commit flag. - */ - @SuppressWarnings("ForLoopReplaceableByForEach") - void finish(boolean commit) { + /** {@inheritDoc} */ + @Override @SuppressWarnings("ForLoopReplaceableByForEach") + public void finish(final boolean commit, final boolean clearThreadMap, final boolean onTimeout) { + tx.recordTracePoint(IgniteTraceAware.TracePoint.TX_COMMIT); + + if (!cctx.mvcc().addFuture(this, futureId())) + return; + if (tx.onNeedCheckBackup()) { assert tx.onePhaseCommit(); @@@ -724,8 -806,8 +819,9 @@@ tx.size(), tx.subjectId(), tx.taskNameHash(), + tx.mvccSnapshot(), - tx.activeCachesDeploymentEnabled() + tx.activeCachesDeploymentEnabled(), + cctx.kernalContext().trace().tracingEnabled() ? new EventsTrace() : null ); // If this is the primary node for the keys. @@@ -860,7 -947,8 +961,9 @@@ tx.activeCachesDeploymentEnabled(), !waitRemoteTxs && (tx.needReturnValue() && tx.implicit()), waitRemoteTxs, + null, - null); ++ null, + cctx.kernalContext().trace().tracingEnabled() ? new EventsTrace() : null); finishReq.checkCommitted(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java index 5fb9991,6b5aa90..56200eb --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java @@@ -24,9 -24,10 +24,11 @@@ import java.util.UUID import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest; + import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.trace.EventsTrace; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; + import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@@ -88,9 -92,8 +93,10 @@@ public class GridNearTxFinishRequest ex int txSize, @Nullable UUID subjId, int taskNameHash, + MvccSnapshot mvccSnapshot, - boolean addDepInfo) { + boolean addDepInfo, + EventsTrace eventsTrace + ) { super( xidVer, futId, @@@ -115,7 -117,7 +121,9 @@@ explicitLock(explicitLock); storeEnabled(storeEnabled); + this.mvccSnapshot = mvccSnapshot; ++ + recordTracePoint(TracePoint.NEAR_FINISH_REQUEST_CREATED); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index a9a2e99,111f5d2..7be33cc --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@@ -66,8 -70,11 +70,12 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxy; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; + import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyRollbackOnlyImpl; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; + import org.apache.ignite.internal.processors.query.EnlistOperation; + import org.apache.ignite.internal.processors.query.UpdateSourceIterator; + import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; +import org.apache.ignite.internal.processors.trace.EventsTrace; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; @@@ -225,14 -255,23 +256,27 @@@ public class GridNearTxLocal extends Gr false, txSize, subjId, - taskNameHash); + taskNameHash, + ctx.kernalContext().trace().tracingEnabled() ? new EventsTrace() : null); + + recordTracePoint(TracePoint.TX_CREATE); + + this.lb = lb; + mappings = implicitSingle ? new IgniteTxMappingsSingleImpl() : new IgniteTxMappingsImpl(); + this.mvccOp = mvccOp; + initResult(); + + trackTimeout = timeout() > 0 && !implicit() && cctx.time().addTimeoutObject(this); + } + + /** + * @return Mvcc query version tracker. + */ + public MvccQueryTracker mvccQueryTracker() { + return mvccTracker; } /** {@inheritDoc} */ @@@ -3099,8 -3714,14 +3719,16 @@@ if (!PREP_FUT_UPD.compareAndSet(this, null, fut)) return prepFut; + if (trackTimeout) { + prepFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + GridNearTxLocal.this.removeTimeoutHandler(); + } + }); + } + + recordTracePoint(TracePoint.TX_PREPARE); + if (timeout == -1) { fut.onDone(this, timeoutException()); @@@ -3153,55 -3771,45 +3778,49 @@@ if (log.isDebugEnabled()) log.debug("Committing near local tx: " + this); - if (fastFinish()) { - state(PREPARING); - state(PREPARED); - state(COMMITTING); + final NearTxFinishFuture fut; + final NearTxFinishFuture fut0 = finishFut; - cctx.tm().fastFinishTx(this, true); + boolean fastFinish; - state(COMMITTED); + if (fut0 != null || !FINISH_FUT_UPD.compareAndSet(this, null, fut = finishFuture(fastFinish = fastFinish(), true))) + return chainFinishFuture(finishFut, true, true, false); - recordTracePoint(TracePoint.TX_END); - - return new GridFinishedFuture<>((IgniteInternalTx)this); - } - - final IgniteInternalFuture<?> prepareFut = prepareNearTxLocal(); - - GridNearTxFinishFuture fut = commitFut; - - if (fut == null && - !COMMIT_FUT_UPD.compareAndSet(this, null, fut = new GridNearTxFinishFuture<>(cctx, this, true))) - return commitFut; - - cctx.mvcc().addFuture(fut, fut.futureId()); + if (!fastFinish) { + final IgniteInternalFuture<?> prepareFut = prepareNearTxLocal(); - prepareFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - GridNearTxFinishFuture fut0 = commitFut; + prepareFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + try { + // Make sure that here are no exceptions. + prepareFut.get(); - try { - // Make sure that here are no exceptions. - prepareFut.get(); + fut.finish(true, true, false); + } + catch (Error | RuntimeException e) { + COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); - fut0.finish(true); - } - catch (Error | RuntimeException e) { - COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); + fut.finish(false, true, false); - fut0.finish(false); + throw e; + } + catch (IgniteCheckedException e) { + COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); - throw e; + if (!(e instanceof NodeStoppingException)) + fut.finish(false, true, true); + else + fut.onNodeStop(e); + } } - catch (IgniteCheckedException e) { - COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); + }); + } - else ++ else { ++ // TODO is this needed? ++ recordTracePoint(TracePoint.TX_END); + - if (!(e instanceof NodeStoppingException)) - fut0.finish(false); - } - } - }); + fut.finish(true, false, false); ++ } return fut; } http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index 3a31c62,55c809d..3a7a788 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@@ -117,8 -124,8 +125,9 @@@ public class GridNearTxPrepareRequest e @Nullable UUID subjId, int taskNameHash, boolean firstClientReq, + boolean allowWaitTopFut, - boolean addDepInfo + boolean addDepInfo, + EventsTrace eventsTrace ) { super(tx, timeout, http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index a7e9ca3,e9865df..2f259e2 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@@ -33,9 -33,9 +33,10 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareResponse; + import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.trace.EventsTrace; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index 6b19db1,3f35c5f..a7a4bd0 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@@ -68,9 -80,9 +80,10 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor; import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor; + import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; import org.apache.ignite.internal.processors.task.GridTaskProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; +import org.apache.ignite.internal.processors.trace.TraceProcessor; import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions; import org.apache.ignite.internal.util.IgniteExceptionRegistry; import org.apache.ignite.internal.util.StripedExecutor; @@@ -351,12 -468,14 +469,19 @@@ public class StandaloneGridKernalContex } /** {@inheritDoc} */ + @Override public TraceProcessor trace() { + return null; + } + + /** {@inheritDoc} */ - @Override public void markSegmented() { } + @Override public MvccProcessor coordinators() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean invalid() { + return false; + } /** {@inheritDoc} */ @Override public boolean segmented() { http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index fb1f1e2,314bb52..7630136 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@@ -347,53 -382,77 +383,78 @@@ public class IgniteTxHandler } try { - if (top != null && needRemap(req.topologyVersion(), top.topologyVersion(), req)) { - if (txPrepareMsgLog.isDebugEnabled()) { - txPrepareMsgLog.debug("Topology version mismatch for near prepare, need remap transaction [" + - "txId=" + req.version() + - ", node=" + nearNodeId + - ", reqTopVer=" + req.topologyVersion() + - ", locTopVer=" + top.topologyVersion() + - ", req=" + req + ']'); - } + if (top != null ) { + boolean retry = false; - GridNearTxPrepareResponse res = new GridNearTxPrepareResponse( - req.partition(), - req.version(), - req.futureId(), - req.miniId(), - req.version(), - req.version(), - null, - null, - top.topologyVersion(), - req.onePhaseCommit(), - req.deployInfo() != null, - req.nodeTrace()); + GridDhtTopologyFuture topFut = top.topologyVersionFuture(); - try { - ctx.io().send(nearNodeId, res, req.policy()); + if (!req.allowWaitTopologyFuture() && !topFut.isDone()) { + retry = true; if (txPrepareMsgLog.isDebugEnabled()) { - txPrepareMsgLog.debug("Sent remap response for near prepare [txId=" + req.version() + - ", node=" + nearNodeId + ']'); + txPrepareMsgLog.debug("Topology change is in progress, need remap transaction [" + + "txId=" + req.version() + + ", node=" + nearNode.id() + + ", reqTopVer=" + req.topologyVersion() + + ", locTopVer=" + top.readyTopologyVersion() + + ", req=" + req + ']'); } } - catch (ClusterTopologyCheckedException ignored) { + + if (!retry && needRemap(req.topologyVersion(), top.readyTopologyVersion(), req)) { + retry = true; + if (txPrepareMsgLog.isDebugEnabled()) { - txPrepareMsgLog.debug("Failed to send remap response for near prepare, node failed [" + + txPrepareMsgLog.debug("Topology version mismatch for near prepare, need remap transaction [" + "txId=" + req.version() + - ", node=" + nearNodeId + ']'); + ", node=" + nearNode.id() + + ", reqTopVer=" + req.topologyVersion() + + ", locTopVer=" + top.readyTopologyVersion() + + ", req=" + req + ']'); } } - catch (IgniteCheckedException e) { - U.error(txPrepareMsgLog, "Failed to send remap response for near prepare " + - "[txId=" + req.version() + - ", node=" + nearNodeId + - ", req=" + req + ']', e); + + if (retry) { + GridNearTxPrepareResponse res = new GridNearTxPrepareResponse( + req.partition(), + req.version(), + req.futureId(), + req.miniId(), + req.version(), + req.version(), + null, + null, + top.lastTopologyChangeVersion(), + req.onePhaseCommit(), - req.deployInfo() != null); ++ req.deployInfo() != null, ++ req.nodeTrace()); + + try { + ctx.io().send(nearNode, res, req.policy()); + + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Sent remap response for near prepare [txId=" + req.version() + + ", node=" + nearNode.id() + ']'); + } + } + catch (ClusterTopologyCheckedException ignored) { + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Failed to send remap response for near prepare, node failed [" + + "txId=" + req.version() + + ", node=" + nearNode.id() + ']'); + } + } + catch (IgniteCheckedException e) { + U.error(txPrepareMsgLog, "Failed to send remap response for near prepare " + + "[txId=" + req.version() + + ", node=" + nearNode.id() + + ", req=" + req + ']', e); + } + + return new GridFinishedFuture<>(res); } - return new GridFinishedFuture<>(res); + assert topFut.isDone(); } tx = new GridDhtTxLocal( @@@ -788,8 -943,7 +945,8 @@@ req.threadId(), req.futureId(), req.miniId(), - new IgniteCheckedException("Transaction has been already completed."), - new IgniteTxRollbackCheckedException("Transaction has been already completed or not started yet.")); ++ new IgniteTxRollbackCheckedException("Transaction has been already completed or not started yet."), + req.nodeTrace()); try { ctx.io().send(nodeId, res, req.policy()); http://git-wip-us.apache.org/repos/asf/ignite/blob/ec2526f4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index ae533cb,e4c96b4..ae6c4e9 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@@ -55,8 -55,8 +55,9 @@@ import org.apache.ignite.configuration. import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.trace.IgniteTraceAware; import org.apache.ignite.internal.util.GridConcurrentHashSet; + import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter; import org.apache.ignite.internal.util.tostring.GridToStringExclude;