http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index e67e60f..a5b2202 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture; @@ -44,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFini import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; @@ -175,6 +177,12 @@ public class IgniteTxHandler { } }); + ctx.io().addHandler(0, GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() { + @Override public void apply(UUID nodeId, GridCacheMessage msg) { + processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg); + } + }); + ctx.io().addHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg); @@ -882,7 +890,7 @@ public class IgniteTxHandler { * @param nodeId Sender node ID. * @param req Request. */ - protected final void processDhtTxPrepareRequest(UUID nodeId, GridDhtTxPrepareRequest req) { + protected final void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest req) { if (txPrepareMsgLog.isDebugEnabled()) { txPrepareMsgLog.debug("Received dht prepare request [txId=" + req.nearXidVersion() + ", dhtTxId=" + req.version() + @@ -918,14 +926,15 @@ public class IgniteTxHandler { if (dhtTx != null) { dhtTx.onePhaseCommit(true); + dhtTx.needReturnValue(req.needReturnValue()); - finish(nodeId, dhtTx, req); + finish(dhtTx, req); } if (nearTx != null) { nearTx.onePhaseCommit(true); - finish(nodeId, nearTx, req); + finish(nearTx, req); } } } @@ -950,38 +959,60 @@ public class IgniteTxHandler { req.deployInfo() != null); } - try { - // Reply back to sender. - ctx.io().send(nodeId, res, req.policy()); + if (req.onePhaseCommit()) { + IgniteInternalFuture completeFut; - if (txPrepareMsgLog.isDebugEnabled()) { - txPrepareMsgLog.debug("Sent dht prepare response [txId=" + req.nearXidVersion() + - ", dhtTxId=" + req.version() + - ", node=" + nodeId + ']'); - } - } - catch (IgniteCheckedException e) { - if (e instanceof ClusterTopologyCheckedException) { - if (txPrepareMsgLog.isDebugEnabled()) { - txPrepareMsgLog.debug("Failed to send dht prepare response, node left [txId=" + req.nearXidVersion() + - ", dhtTxId=" + req.version() + - ", node=" + nodeId + ']'); - } - } - else { - U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [" + - "txId=" + req.nearXidVersion() + - ", dhtTxId=" + req.version() + - ", node=" + nodeId + - ", err=" + e.getMessage() + ']'); + IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ? + null : dhtTx.done() ? null : dhtTx.finishFuture(); + + final IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ? + null : nearTx.done() ? null : nearTx.finishFuture(); + + if (dhtFin != null && nearFin != null) { + GridCompoundFuture fut = new GridCompoundFuture(); + + fut.add(dhtFin); + fut.add(nearFin); + + fut.markInitialized(); + + completeFut = fut; } + else + completeFut = dhtFin != null ? dhtFin : nearFin; - if (nearTx != null) - nearTx.rollback(); + if (completeFut != null) { + final GridDhtTxPrepareResponse res0 = res; + final GridDhtTxRemote dhtTx0 = dhtTx; + final GridNearTxRemote nearTx0 = nearTx; - if (dhtTx != null) - dhtTx.rollback(); + completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { + @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) { + sendReply(nodeId, req, res0, dhtTx0, nearTx0); + } + }); + } + else + sendReply(nodeId, req, res, dhtTx, nearTx); } + else + sendReply(nodeId, req, res, dhtTx, nearTx); + } + + /** + * @param nodeId Node ID. + * @param req Request. + */ + protected final void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId, + final GridDhtTxOnePhaseCommitAckRequest req) { + assert nodeId != null; + assert req != null; + + if (log.isDebugEnabled()) + log.debug("Processing dht tx one phase commit ack request [nodeId=" + nodeId + ", req=" + req + ']'); + + for (GridCacheVersion ver : req.versions()) + ctx.tm().removeTxReturn(ver); } /** @@ -1139,12 +1170,10 @@ public class IgniteTxHandler { } /** - * @param nodeId Node ID. * @param tx Transaction. * @param req Request. */ protected void finish( - UUID nodeId, GridDistributedTxRemoteAdapter tx, GridDhtTxPrepareRequest req) throws IgniteTxHeuristicCheckedException { assert tx != null : "No transaction for one-phase commit prepare request: " + req; @@ -1177,6 +1206,52 @@ public class IgniteTxHandler { } /** + * @param nodeId Node id. + * @param req Request. + * @param res Response. + * @param dhtTx Dht tx. + * @param nearTx Near tx. + */ + protected void sendReply(UUID nodeId, + GridDhtTxPrepareRequest req, + GridDhtTxPrepareResponse res, + GridDhtTxRemote dhtTx, + GridNearTxRemote nearTx) { + try { + // Reply back to sender. + ctx.io().send(nodeId, res, req.policy()); + + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Sent dht prepare response [txId=" + req.nearXidVersion() + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + ']'); + } + } + catch (IgniteCheckedException e) { + if (e instanceof ClusterTopologyCheckedException) { + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Failed to send dht prepare response, node left [txId=" + req.nearXidVersion() + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + ']'); + } + } + else { + U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [" + + "txId=" + req.nearXidVersion() + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + + ", err=" + e.getMessage() + ']'); + } + + if (nearTx != null) + nearTx.rollback(); + + if (dhtTx != null) + dhtTx.rollback(); + } + } + + /** * Sends tx finish response to remote node, if response is requested. * * @param nodeId Node id that originated finish request. @@ -1191,7 +1266,26 @@ public class IgniteTxHandler { if (req.checkCommitted()) { res.checkCommitted(true); - if (!committed) { + if (committed) { + if (req.needReturnValue()) { + try { + GridCacheReturnCompletableWrapper wrapper = ctx.tm().getCommittedTxReturn(req.version()); + + if (wrapper != null) + res.returnValue(wrapper.fut().get()); + else + assert !ctx.discovery().alive(nodeId) : nodeId; + } + catch (IgniteCheckedException e) { + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Failed to gain entry processor return value. [txId=" + nearTxId + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + ']'); + } + } + } + } + else { ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException("Primary node left grid."); @@ -1492,8 +1586,7 @@ public class IgniteTxHandler { * @param req Request. */ protected void processCheckPreparedTxRequest(final UUID nodeId, - final GridCacheTxRecoveryRequest req) - { + final GridCacheTxRecoveryRequest req) { if (txRecoveryMsgLog.isDebugEnabled()) { txRecoveryMsgLog.debug("Received tx recovery request [txId=" + req.nearXidVersion() + ", node=" + nodeId + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 637f322..fe69536 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -151,9 +151,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig /** Commit error. */ protected volatile Throwable commitErr; - /** Need return value. */ - protected boolean needRetVal; - /** Implicit transaction result. */ protected GridCacheReturn implicitRes; @@ -355,13 +352,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } /** - * @return Flag indicating whether transaction needs return value. - */ - public boolean needReturnValue() { - return needRetVal; - } - - /** * @return {@code True} if transaction participates in a cache that has an interceptor configured. */ public boolean hasInterceptor() { @@ -369,13 +359,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } /** - * @param needRetVal Need return value flag. - */ - public void needReturnValue(boolean needRetVal) { - this.needRetVal = needRetVal; - } - - /** * @param snd {@code True} if values in tx entries should be replaced with transformed values and sent * to remote nodes. */ @@ -703,7 +686,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig txEntry.cached().unswap(false); IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(txEntry, - true); + true, null); GridCacheVersion dhtVer = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index f9357f9..a1580a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -39,6 +39,7 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -49,7 +50,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; +import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; +import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture; @@ -57,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLo import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; @@ -87,8 +92,11 @@ import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import org.jsr166.ConcurrentLinkedDeque8; import org.jsr166.ConcurrentLinkedHashMap; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS; @@ -123,6 +131,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** Tx salvage timeout (default 3s). */ private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100); + /** One phase commit deferred ack request timeout. */ + public static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT = + Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT, 500); + + /** One phase commit deferred ack request buffer size. */ + private static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE = + Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE, 256); + /** Version in which deadlock detection introduced. */ public static final IgniteProductVersion TX_DEADLOCK_DETECTION_SINCE = IgniteProductVersion.fromString("1.5.19"); @@ -160,7 +176,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT)); /** Committed local transactions. */ - private final ConcurrentLinkedHashMap<GridCacheVersion, Boolean> completedVersHashMap = + private final ConcurrentLinkedHashMap<GridCacheVersion, Object> completedVersHashMap = new ConcurrentLinkedHashMap<>( Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT), 0.75f, @@ -168,6 +184,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT), PER_SEGMENT_Q); + /** Pending one phase commit ack requests sender. */ + private GridDeferredAckMessageSender deferredAckMessageSender; + /** Transaction finish synchronizer. */ private GridCacheTxFinishSync txFinishSync; @@ -209,6 +228,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { for (TxDeadlockFuture fut : deadlockDetectFuts.values()) fut.onNodeLeft(nodeId); + + for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) { + Object obj = entry.getValue(); + + if (obj instanceof GridCacheReturnCompletableWrapper && + nodeId.equals(((GridCacheReturnCompletableWrapper)obj).nodeId())) + removeTxReturn(entry.getKey()); + } } }, EVT_NODE_FAILED, EVT_NODE_LEFT); @@ -237,6 +264,33 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { txFinishSync = new GridCacheTxFinishSync<>(cctx); txHnd = new IgniteTxHandler(cctx); + + deferredAckMessageSender = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) { + @Override public int getTimeout() { + return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT; + } + + @Override public int getBufferSize() { + return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE; + } + + @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) { + GridDhtTxOnePhaseCommitAckRequest ackReq = new GridDhtTxOnePhaseCommitAckRequest(vers); + + cctx.kernalContext().gateway().readLock(); + + try { + cctx.io().send(nodeId, ackReq, GridIoPolicy.SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + log.error("Failed to send one phase commit ack to backup node [backup=" + + nodeId + ']', e); + } + finally { + cctx.kernalContext().gateway().readUnlock(); + } + } + }; } /** {@inheritDoc} */ @@ -898,9 +952,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { */ public void addCommittedTx(IgniteInternalTx tx) { addCommittedTx(tx, tx.xidVersion(), tx.nearXidVersion()); + } - if (!tx.local() && !tx.near() && tx.onePhaseCommit()) - addCommittedTx(tx, tx.nearXidVersion(), null); + /** + * @param tx Committed transaction. + */ + public void addCommittedTxReturn(IgniteInternalTx tx, GridCacheReturnCompletableWrapper ret) { + addCommittedTxReturn(tx.nearXidVersion(), null, ret); } /** @@ -925,7 +983,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (nearXidVer != null) xidVer = new CommittedVersion(xidVer, nearXidVer); - Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, true); + Object committed0 = completedVersHashMap.putIfAbsent(xidVer, true); if (committed0 == null && (tx == null || tx.needsCompletedVersions())) { Boolean b = completedVersSorted.putIfAbsent(xidVer, true); @@ -933,7 +991,29 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { assert b == null; } - return committed0 == null || committed0; + Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE); + + return committed0 == null || committed; + } + + /** + * @param xidVer Completed transaction version. + * @param nearXidVer Optional near transaction ID. + * @param retVal Invoke result. + */ + private void addCommittedTxReturn( + GridCacheVersion xidVer, + @Nullable GridCacheVersion nearXidVer, + GridCacheReturnCompletableWrapper retVal + ) { + assert retVal != null; + + if (nearXidVer != null) + xidVer = new CommittedVersion(xidVer, nearXidVer); + + Object prev = completedVersHashMap.putIfAbsent(xidVer, retVal); + + assert prev == null || Boolean.FALSE.equals(prev) : prev; // Can be rolled back. } /** @@ -945,7 +1025,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { IgniteInternalTx tx, GridCacheVersion xidVer ) { - Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, false); + Object committed0 = completedVersHashMap.putIfAbsent(xidVer, false); if (committed0 == null && (tx == null || tx.needsCompletedVersions())) { Boolean b = completedVersSorted.putIfAbsent(xidVer, false); @@ -953,7 +1033,47 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { assert b == null; } - return committed0 == null || !committed0; + Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE); + + return committed0 == null || !committed; + } + + /** + * @param xidVer xidVer Completed transaction version. + * @return Tx result. + */ + public GridCacheReturnCompletableWrapper getCommittedTxReturn(GridCacheVersion xidVer) { + Object retVal = completedVersHashMap.get(xidVer); + + // Will gain true in regular case or GridCacheReturn in onePhaseCommit case. + if (!Boolean.TRUE.equals(retVal)) { + assert !Boolean.FALSE.equals(retVal); // Method should be used only after 'committed' checked. + + GridCacheReturnCompletableWrapper res = (GridCacheReturnCompletableWrapper)retVal; + + removeTxReturn(xidVer); + + return res; + } + else + return null; + } + + /** + * @param xidVer xidVer Completed transaction version. + */ + public void removeTxReturn(GridCacheVersion xidVer) { + Object prev = completedVersHashMap.get(xidVer); + + if (Boolean.FALSE.equals(prev)) // Tx can be rolled back. + return; + + assert prev instanceof GridCacheReturnCompletableWrapper: + prev + " instead of GridCacheReturnCompletableWrapper"; + + boolean res = completedVersHashMap.replace(xidVer, prev, true); + + assert res; } /** @@ -1086,7 +1206,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * so we don't do it here. */ - Boolean committed = completedVersHashMap.get(tx.xidVersion()); + Object committed0 = completedVersHashMap.get(tx.xidVersion()); + + Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE); // 1. Make sure that committed version has been recorded. if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) { @@ -1672,12 +1794,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { boolean committed = false; - for (Map.Entry<GridCacheVersion, Boolean> entry : completedVersHashMap.entrySet()) { + for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) { if (entry.getKey() instanceof CommittedVersion) { CommittedVersion comm = (CommittedVersion)entry.getKey(); if (comm.nearVer.equals(xidVer)) { - committed = entry.getValue(); + committed = !entry.getValue().equals(Boolean.FALSE); break; } @@ -1809,8 +1931,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // Not all transactions were found. Need to scan committed versions to check // if transaction was already committed. - for (Map.Entry<GridCacheVersion, Boolean> e : completedVersHashMap.entrySet()) { - if (!e.getValue()) + for (Map.Entry<GridCacheVersion, Object> e : completedVersHashMap.entrySet()) { + if (e.getValue().equals(Boolean.FALSE)) continue; GridCacheVersion ver = e.getKey(); @@ -2137,6 +2259,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * @param nodeId Node ID to send message to. + * @param ver Version to ack. + */ + public void sendDeferredAckResponse(UUID nodeId, GridCacheVersion ver) { + deferredAckMessageSender.sendDeferredAckMessage(nodeId, ver); + } + + /** * @return Collection of active transaction deadlock detection futures. */ @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java index e611723..c3d194b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; @@ -30,7 +31,6 @@ import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; import javax.cache.processor.EntryProcessorResult; import javax.cache.processor.MutableEntry; - import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; @@ -43,8 +43,10 @@ import org.apache.ignite.configuration.AtomicConfiguration; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; @@ -59,6 +61,7 @@ import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager.DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT; import static org.apache.ignite.testframework.GridTestUtils.TestMemoryMode; import static org.apache.ignite.testframework.GridTestUtils.runAsync; @@ -70,7 +73,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** */ - private static final long DURATION = 60_000; + protected static final long DURATION = 60_000; /** */ protected static final int GRID_CNT = 4; @@ -78,8 +81,8 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst /** * @return Keys count for the test. */ - private int keysCount() { - return 10_000; + protected int keysCount() { + return 2_000; } /** @@ -249,12 +252,17 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst IgniteInternalFuture<Object> fut = runAsync(new Callable<Object>() { @Override public Object call() throws Exception { + Random rnd = new Random(); + while (!finished.get()) { stopGrid(3); U.sleep(300); startGrid(3); + + if (rnd.nextBoolean()) // OPC possible only when there is no migration from one backup to another. + awaitPartitionMapExchange(); } return null; @@ -456,6 +464,29 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst assertTrue("Unexpected atomic futures: " + futs, futs.isEmpty()); } + + checkOnePhaseCommitReturnValuesCleaned(); + } + + /** + * + */ + protected void checkOnePhaseCommitReturnValuesCleaned() throws IgniteInterruptedCheckedException { + U.sleep(DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT); + + for (int i = 0; i < GRID_CNT; i++) { + IgniteKernal ignite = (IgniteKernal)grid(i); + + IgniteTxManager tm = ignite.context().cache().context().tm(); + + Map completedVersHashMap = U.field(tm, "completedVersHashMap"); + + for (Object o : completedVersHashMap.values()) { + assertTrue("completedVersHashMap contains" + o.getClass() + " instead of boolean. " + + "These values should be replaced by boolean after onePhaseCommit finished. " + + "[node=" + i + "]", o instanceof Boolean); + } + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java index 9204bc8..9bfde27 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.HashSet; +import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; @@ -88,16 +89,6 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr } } - /** {@inheritDoc} */ - @Override public void testGetAndPut() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1525"); - } - - /** {@inheritDoc} */ - @Override public void testInvoke() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1525"); - } - /** * @throws Exception If failed. */ @@ -217,6 +208,70 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr } /** + * + */ + public void testOriginatingNodeFailureForcesOnePhaseCommitDataCleanup() throws Exception { + ignite(0).createCache(cacheConfiguration(TestMemoryMode.HEAP, false)); + + final AtomicBoolean finished = new AtomicBoolean(); + + final int keysCnt = keysCount(); + + IgniteInternalFuture<Object> fut = runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Random rnd = new Random(); + + while (!finished.get()) { + stopGrid(0); + + U.sleep(300); + + startGrid(0); + + if (rnd.nextBoolean()) // OPC possible only when there is no migration from one backup to another. + awaitPartitionMapExchange(); + } + + return null; + } + }); + + IgniteInternalFuture<Object> fut2 = runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int iter = 0; + + while (!finished.get()) { + try { + IgniteCache<Integer, Integer> cache = ignite(0).cache(null); + + Integer val = ++iter; + + for (int i = 0; i < keysCnt; i++) + cache.invoke(i, new SetEntryProcessor(val)); + } + catch (Exception e) { + // No-op. + } + } + + return null; + } + }); + + try { + U.sleep(DURATION); + } + finally { + finished.set(true); + + fut.get(); + fut2.get(); + } + + checkOnePhaseCommitReturnValuesCleaned(); + } + + /** * Callable to process inside transaction. */ private static class ProcessCallable implements Callable<Void> { http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-client-mode.properties ---------------------------------------------------------------------- diff --git a/modules/yardstick/config/benchmark-client-mode.properties b/modules/yardstick/config/benchmark-client-mode.properties index ba5525f..f7c8347 100644 --- a/modules/yardstick/config/benchmark-client-mode.properties +++ b/modules/yardstick/config/benchmark-client-mode.properties @@ -70,6 +70,8 @@ CONFIGS="\ -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutBenchmark -sn IgniteNode -ds ${ver}atomic-put-1-backup,\ -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutGetBenchmark -sn IgniteNode -ds ${ver}atomic-put-get-1-backup,\ -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds ${ver}tx-put-1-backup,\ +-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds ${ver}tx-getAndPut-1-backup,\ +-cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds ${ver}tx-invoke-1-backup,\ -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds ${ver}tx-put-get-1-backup,\ -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteSqlQueryBenchmark -sn IgniteNode -ds ${ver}sql-query-1-backup,\ -cfg ${SCRIPT_DIR}/../config/ignite-multicast-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 --client -sm PRIMARY_SYNC -dn IgniteSqlQueryJoinBenchmark -sn IgniteNode -ds ${ver}sql-query-join-1-backup,\ http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-tx-win.properties ---------------------------------------------------------------------- diff --git a/modules/yardstick/config/benchmark-tx-win.properties b/modules/yardstick/config/benchmark-tx-win.properties index 73b857d..54a40b1 100644 --- a/modules/yardstick/config/benchmark-tx-win.properties +++ b/modules/yardstick/config/benchmark-tx-win.properties @@ -54,6 +54,8 @@ set DRIVER_HOSTS=localhost :: Note that each benchmark is set to run for 300 seconds (5 mins) with warm-up set to 60 seconds (1 minute). set CONFIGS=^ -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds tx-put-1-backup,^ +-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds tx-getAndPut-1-backup,^ +-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds tx-invoke-1-backup,^ -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapBenchmark -sn IgniteNode -ds tx-put-offheap-1-backup,^ -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapValuesBenchmark -sn IgniteNode -ds tx-put-offheap-val-1-backup,^ -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds tx-put-get-1-backup,^ http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-tx.properties ---------------------------------------------------------------------- diff --git a/modules/yardstick/config/benchmark-tx.properties b/modules/yardstick/config/benchmark-tx.properties index f3dbc24..0d5bb02 100644 --- a/modules/yardstick/config/benchmark-tx.properties +++ b/modules/yardstick/config/benchmark-tx.properties @@ -59,6 +59,8 @@ nodesNum=$((`echo ${SERVER_HOSTS} | tr ',' '\n' | wc -l` + `echo ${DRIVER_HOSTS} # Note that each benchmark is set to run for 300 seconds (5 mins) with warm-up set to 60 seconds (1 minute). CONFIGS="\ -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds tx-put-1-backup,\ +-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds tx-getAndPut-1-backup,\ +-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds tx-invoke-1-backup,\ -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapBenchmark -sn IgniteNode -ds tx-put-offheap-1-backup,\ -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxOffHeapValuesBenchmark -sn IgniteNode -ds tx-put-offheap-val-1-backup,\ -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds tx-put-get-1-backup,\ http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark-win.properties ---------------------------------------------------------------------- diff --git a/modules/yardstick/config/benchmark-win.properties b/modules/yardstick/config/benchmark-win.properties index b6ecd67..b75b5d6 100644 --- a/modules/yardstick/config/benchmark-win.properties +++ b/modules/yardstick/config/benchmark-win.properties @@ -59,6 +59,8 @@ set CONFIGS=^ -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutBenchmark -sn IgniteNode -ds atomic-put-1-backup,^ -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetBenchmark -sn IgniteNode -ds atomic-put-get-1-backup,^ -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds tx-put-1-backup,^ +-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds tx-getAndPut-1-backup,^ +-cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds tx-invoke-1-backup,^ -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds tx-put-get-1-backup,^ -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryBenchmark -sn IgniteNode -ds sql-query-1-backup,^ -cfg %SCRIPT_DIR%\..\config\ignite-localhost-config.xml -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryJoinBenchmark -sn IgniteNode -ds sql-query-join-1-backup,^ http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/config/benchmark.properties ---------------------------------------------------------------------- diff --git a/modules/yardstick/config/benchmark.properties b/modules/yardstick/config/benchmark.properties index 67ef5ef..cfc1499 100644 --- a/modules/yardstick/config/benchmark.properties +++ b/modules/yardstick/config/benchmark.properties @@ -71,6 +71,8 @@ CONFIGS="\ -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutBenchmark -sn IgniteNode -ds ${ver}atomic-put-1-backup,\ -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetBenchmark -sn IgniteNode -ds ${ver}atomic-put-get-1-backup,\ -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutTxBenchmark -sn IgniteNode -ds ${ver}tx-put-1-backup,\ +-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteGetAndPutTxBenchmark -sn IgniteNode -ds ${ver}tx-getAndPut-1-backup,\ +-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteInvokeTxBenchmark -sn IgniteNode -ds ${ver}tx-invoke-1-backup,\ -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgnitePutGetTxBenchmark -sn IgniteNode -ds ${ver}tx-put-get-1-backup,\ -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryBenchmark -sn IgniteNode -ds ${ver}sql-query-1-backup,\ -cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 1 -w 60 -d 300 -t 64 -sm PRIMARY_SYNC -dn IgniteSqlQueryJoinBenchmark -sn IgniteNode -ds ${ver}sql-query-join-1-backup,\ http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java new file mode 100644 index 0000000..40e563c --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutBenchmark.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache; + +import java.util.Map; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.yardstick.cache.model.SampleValue; + +/** + * Ignite benchmark that performs invoke operations. + */ +public class IgniteGetAndPutBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> { + /** {@inheritDoc} */ + @Override public boolean test(Map<Object, Object> ctx) throws Exception { + int key = nextRandom(args.range()); + + cache.getAndPut(key, new SampleValue(key)); + + return true; + } + + /** {@inheritDoc} */ + @Override protected IgniteCache<Integer, Object> cache() { + return ignite().cache("atomic"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java new file mode 100644 index 0000000..49ae985 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteGetAndPutTxBenchmark.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache; + +import java.util.Map; +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.yardstick.IgniteBenchmarkUtils; +import org.apache.ignite.yardstick.cache.model.SampleValue; +import org.yardstickframework.BenchmarkConfiguration; + +/** + * Ignite benchmark that performs invoke operations. + */ +public class IgniteGetAndPutTxBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> { + /** */ + private IgniteTransactions txs; + + /** */ + private Callable<Void> clo; + + /** {@inheritDoc} */ + @Override public void setUp(BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + + if (!IgniteSystemProperties.getBoolean("SKIP_MAP_CHECK")) + ignite().compute().broadcast(new WaitMapExchangeFinishCallable()); + + txs = ignite().transactions(); + + clo = new Callable<Void>() { + @Override public Void call() throws Exception { + int key = nextRandom(args.range()); + + cache.getAndPut(key, new SampleValue(key)); + + return null; + } + }; + } + + /** {@inheritDoc} */ + @Override public boolean test(Map<Object, Object> ctx) throws Exception { + IgniteBenchmarkUtils.doInTransaction(txs, args.txConcurrency(), args.txIsolation(), clo); + + return true; + } + + /** {@inheritDoc} */ + @Override protected IgniteCache<Integer, Object> cache() { + return ignite().cache("tx"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java index 8f05598..64dc6b8 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteInvokeTxBenchmark.java @@ -17,12 +17,52 @@ package org.apache.ignite.yardstick.cache; +import java.util.Map; +import java.util.concurrent.Callable; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.yardstick.IgniteBenchmarkUtils; +import org.apache.ignite.yardstick.cache.model.SampleValue; +import org.yardstickframework.BenchmarkConfiguration; /** * Ignite benchmark that performs invoke operations. */ public class IgniteInvokeTxBenchmark extends IgniteInvokeBenchmark { + /** */ + private IgniteTransactions txs; + + /** */ + private Callable<Void> clo; + + /** {@inheritDoc} */ + @Override public void setUp(BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + + if (!IgniteSystemProperties.getBoolean("SKIP_MAP_CHECK")) + ignite().compute().broadcast(new WaitMapExchangeFinishCallable()); + + txs = ignite().transactions(); + + clo = new Callable<Void>() { + @Override public Void call() throws Exception { + int key = nextRandom(args.range()); + + cache.invoke(key, new SetValueEntryProcessor(new SampleValue(key))); + + return null; + } + }; + } + + /** {@inheritDoc} */ + @Override public boolean test(Map<Object, Object> ctx) throws Exception { + IgniteBenchmarkUtils.doInTransaction(txs, args.txConcurrency(), args.txIsolation(), clo); + + return true; + } + /** {@inheritDoc} */ @Override protected IgniteCache<Integer, Object> cache() { return ignite().cache("tx");