IGNITE-8149: MVCC: correct processing of cache size. This closes #4654.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/00e935d3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/00e935d3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/00e935d3 Branch: refs/heads/ignite-5960 Commit: 00e935d33cd852cdc2a460f24ca0f0136ac5c59e Parents: 40432b0 Author: ipavlukhin <vololo...@gmail.com> Authored: Fri Sep 7 16:00:02 2018 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Fri Sep 7 16:00:02 2018 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 4 +- .../processors/cache/GridCacheMapEntry.java | 12 + .../cache/IgniteCacheOffheapManager.java | 8 + .../cache/IgniteCacheOffheapManagerImpl.java | 54 +- .../GridDistributedTxRemoteAdapter.java | 51 +- .../dht/GridDhtPartitionsUpdateCountersMap.java | 119 ----- .../dht/GridDhtTxAbstractEnlistFuture.java | 2 +- .../distributed/dht/GridDhtTxFinishFuture.java | 6 +- .../distributed/dht/GridDhtTxFinishRequest.java | 14 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 51 +- .../cache/distributed/dht/GridDhtTxRemote.java | 12 - .../dht/PartitionUpdateCounters.java | 123 +++++ .../persistence/GridCacheOffheapManager.java | 13 + .../cache/transactions/IgniteInternalTx.java | 8 +- .../cache/transactions/IgniteTxAdapter.java | 57 ++- .../cache/transactions/IgniteTxHandler.java | 2 +- .../transactions/IgniteTxLocalAdapter.java | 104 ++-- .../cache/transactions/IgniteTxLocalEx.java | 8 + .../cache/transactions/IgniteTxLocalState.java | 16 + .../transactions/IgniteTxLocalStateAdapter.java | 26 + .../cache/transactions/IgniteTxRemoteEx.java | 12 - .../cache/transactions/TxCounters.java | 82 ++++ .../cache/mvcc/CacheMvccSizeTest.java | 488 +++++++++++++++++++ .../testsuites/IgniteCacheMvccSqlTestSuite.java | 5 +- 24 files changed, 990 insertions(+), 287 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 2970e71..14f0548 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -70,7 +70,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsUpdateCountersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest; @@ -80,6 +79,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQuer import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryFirstEnlistRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCounters; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest; @@ -1062,7 +1062,7 @@ public class GridIoMessageFactory implements MessageFactory { break; case 157: - msg = new GridDhtPartitionsUpdateCountersMap(); + msg = new PartitionUpdateCounters(); break; http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 8c2b939..714d4a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1122,6 +1122,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (tx.local()) updateCntr = nextMvccPartitionCounter(); + if (res.resultType() == ResultType.PREV_NULL) + tx.txCounters(true).accumulateSizeDelta(cctx.cacheId(), partition(), 1); + if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry( cctx.cacheId(), @@ -1217,6 +1220,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (tx.local()) updateCntr = nextMvccPartitionCounter(); + if (res.resultType() == ResultType.PREV_NOT_NULL) + tx.txCounters(true).accumulateSizeDelta(cctx.cacheId(), partition(), -1); + if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) logPtr = logTxUpdate(tx, null, 0, updateCntr); @@ -4994,6 +5000,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (updateCntr != null && updateCntr != 0) updateCntr0 = updateCntr; + if (res.resultType() == ResultType.PREV_NOT_NULL) + tx.txCounters(true).accumulateSizeDelta(cctx.cacheId(), entry.partition(), -1); + if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry( cctx.cacheId(), @@ -5281,6 +5290,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme updateCntr0 = tx.local() ? entry.nextMvccPartitionCounter() : updateCntr; + if (res.resultType() == ResultType.PREV_NULL) + tx.txCounters(true).accumulateSizeDelta(cctx.cacheId(), entry.partition(), 1); + if (cctx.group().persistenceEnabled() && cctx.group().walEnabled()) logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry( cctx.cacheId(), http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index a021394..fdf42fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -620,6 +620,14 @@ public interface IgniteCacheOffheapManager { long fullSize(); /** + * Updates size metric for particular cache. + * + * @param cacheId Cache ID. + * @param delta Size delta. + */ + void updateSize(int cacheId, long delta); + + /** * @return Update counter. */ long updateCounter(); http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 13ad7e2..8811006 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -1450,36 +1450,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager * @param cacheId Cache ID. */ void incrementSize(int cacheId) { - storageSize.incrementAndGet(); - - if (grp.sharedGroup()) { - AtomicLong size = cacheSizes.get(cacheId); - - if (size == null) { - AtomicLong old = cacheSizes.putIfAbsent(cacheId, size = new AtomicLong()); - - if (old != null) - size = old; - } - - size.incrementAndGet(); - } + updateSize(cacheId, 1); } /** * @param cacheId Cache ID. */ void decrementSize(int cacheId) { - storageSize.decrementAndGet(); - - if (grp.sharedGroup()) { - AtomicLong size = cacheSizes.get(cacheId); - - if (size == null) - return; - - size.decrementAndGet(); - } + updateSize(cacheId, -1); } /** {@inheritDoc} */ @@ -1517,6 +1495,24 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ + @Override public void updateSize(int cacheId, long delta) { + storageSize.addAndGet(delta); + + if (grp.sharedGroup()) { + AtomicLong size = cacheSizes.get(cacheId); + + if (size == null) { + AtomicLong old = cacheSizes.putIfAbsent(cacheId, size = new AtomicLong()); + + if (old != null) + size = old; + } + + size.addAndGet(delta); + } + } + + /** {@inheritDoc} */ @Override public long nextUpdateCounter() { return cntr.incrementAndGet(); } @@ -1953,8 +1949,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager assert !old; - incrementSize(cctx.cacheId()); - GridCacheQueryManager qryMgr = cctx.queries(); if (qryMgr.enabled()) @@ -2284,11 +2278,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager rowStore.removeRow(row.link()); - decrementSize(cctx.cacheId()); - if (first) first = false; } + + // first == true means there were no row versions + if (!first) + decrementSize(cctx.cacheId()); } /** {@inheritDoc} */ @@ -2318,8 +2314,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager rowStore.removeRow(cleanupRow.link()); - decrementSize(cctx.cacheId()); - res++; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index dd6ea48..8e96ae2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -51,7 +51,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsUpdateCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCounters; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -61,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteEx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteState; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState; +import org.apache.ignite.internal.processors.cache.transactions.TxCounters; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; @@ -807,7 +809,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } } - updateLocalCounters(); + applyTxCounters(); if (!near() && !F.isEmpty(dataEntries) && cctx.wal() != null) { // Set new update counters for data entries received from persisted tx entries. @@ -845,29 +847,34 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } } - /** - * Applies update counters to the local partitions. - */ - private void updateLocalCounters() { - Map<Integer, GridDhtPartitionsUpdateCountersMap> updCntrsMap = updateCountersMap(); + /** {@inheritDoc} */ + @Override protected void applyTxCounters() { + super.applyTxCounters(); + + TxCounters txCntrs = txCounters(false); - if (F.isEmpty(updCntrsMap)) + if (txCntrs == null) return; - for (Map.Entry<Integer, GridDhtPartitionsUpdateCountersMap> entry : updCntrsMap.entrySet()) { - GridCacheContext cacheCtx = cctx.cacheContext(entry.getKey()); + Map<Integer, PartitionUpdateCounters> updCntrs = txCntrs.updateCounters(); - GridDhtPartitionsUpdateCountersMap cacheUpdCntrs = entry.getValue(); + for (Map.Entry<Integer, PartitionUpdateCounters> entry : updCntrs.entrySet()) { + int cacheId = entry.getKey(); - assert cacheUpdCntrs != null && !F.isEmpty(cacheUpdCntrs.updateCounters()); + GridDhtPartitionTopology top = cctx.cacheContext(cacheId).topology(); - for (Map.Entry<Integer, Long> e : cacheUpdCntrs.updateCounters().entrySet()) { - Long updCntr = e.getValue(); - GridDhtLocalPartition part = cacheCtx.topology().localPartition(e.getKey()); + Map<Integer, Long> cacheUpdCntrs = entry.getValue().updateCounters(); - assert part != null && updCntr != null && updCntr > 0; + assert cacheUpdCntrs != null; - part.updateCounter(updCntr); + for (Map.Entry<Integer, Long> e : cacheUpdCntrs.entrySet()) { + long updCntr = e.getValue(); + + GridDhtLocalPartition dhtPart = top.localPartition(e.getKey()); + + assert dhtPart != null && updCntr > 0; + + dhtPart.updateCounter(updCntr); } } } @@ -1023,16 +1030,6 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ - @Override public void updateCountersMap(Map<Integer, GridDhtPartitionsUpdateCountersMap> updCntrsMap) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public Map<Integer, GridDhtPartitionsUpdateCountersMap> updateCountersMap() { - return null; - } - - /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(GridDistributedTxRemoteAdapter.class, this, "super", super.toString()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsUpdateCountersMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsUpdateCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsUpdateCountersMap.java deleted file mode 100644 index 7a345d1..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsUpdateCountersMap.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht; - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import org.apache.ignite.internal.GridDirectMap; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** - * Partitions update counters message. - */ -public class GridDhtPartitionsUpdateCountersMap implements Message { - /** */ - private static final long serialVersionUID = -4599730112233297219L; - - /** Map of update counters made by this tx. Mapping: partId -> updCntr. */ - @GridDirectMap(keyType = Integer.class, valueType = Long.class) - private Map<Integer, Long> updCntrs; - - /** - * - */ - public GridDhtPartitionsUpdateCountersMap() { - updCntrs = new HashMap<>(); - } - - /** - * @return Update counters. - */ - public Map<Integer, Long> updateCounters() { - return updCntrs; - } - - /** - * @param updCntrs Update counters. - */ - public void updateCounters(Map<Integer, Long> updCntrs) { - this.updCntrs = updCntrs; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeMap("updCntrs", updCntrs, MessageCollectionItemType.INT, MessageCollectionItemType.LONG)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - updCntrs = reader.readMap("updCntrs", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(GridDhtPartitionsUpdateCountersMap.class); - } - - /** {@inheritDoc} */ - @Override public short directType() { - return 157; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 1; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java index bb11df5..a3471c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java @@ -593,7 +593,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt int part = cctx.affinity().partition(key); - tx.addPartitionCountersMapping(cacheId, part); + tx.touchPartition(cacheId, part); if (F.isEmpty(backups)) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index c0a3845..4c72e6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -457,10 +457,10 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity for (IgniteTxEntry e : dhtMapping.entries()) updCntrs.add(e.updateCounter()); - Map<Integer, GridDhtPartitionsUpdateCountersMap> updCntrsMap = null; + Map<Integer, PartitionUpdateCounters> updCntrsForNode = null; if (dhtMapping.queryUpdate() && commit) - updCntrsMap = tx.updateCountersForNode(n); + updCntrsForNode = tx.filterUpdateCountersForBackupNode(n); GridDhtTxFinishRequest req = new GridDhtTxFinishRequest( tx.nearNodeId(), @@ -489,7 +489,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity false, false, mvccSnapshot, - updCntrsMap); + updCntrsForNode); req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index b1908ff..8e9ece6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -73,9 +73,10 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** */ private MvccSnapshot mvccSnapshot; - /** Map of update counters made by this tx. Mapping: cacheId -> partId -> updCntr. */ - @GridDirectMap(keyType = Integer.class, valueType = GridDhtPartitionsUpdateCountersMap.class) - private Map<Integer, GridDhtPartitionsUpdateCountersMap> updCntrs; + /** */ + @GridDirectMap(keyType = Integer.class, valueType = PartitionUpdateCounters.class) + private Map<Integer, PartitionUpdateCounters> updCntrs; + /** * Empty constructor required for {@link Externalizable}. */ @@ -220,7 +221,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { boolean retVal, boolean waitRemoteTxs, MvccSnapshot mvccSnapshot, - Map<Integer, GridDhtPartitionsUpdateCountersMap> updCntrs + Map<Integer, PartitionUpdateCounters> updCntrs ) { this(nearNodeId, futId, @@ -362,10 +363,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { public void needReturnValue(boolean retVal) { setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK); } + /** - * @return Partition update counters map. + * @return Partition counters update deferred until transaction commit. */ - public Map<Integer, GridDhtPartitionsUpdateCountersMap> updateCountersMap() { + public Map<Integer, PartitionUpdateCounters> updateCounters() { return updCntrs; } http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 11b46a0..613f160 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.io.Externalizable; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -31,6 +32,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; @@ -42,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; +import org.apache.ignite.internal.processors.cache.transactions.TxCounters; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanMap; @@ -907,7 +910,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * * @return Current lock future or null if it's safe to roll back. */ - public @Nullable IgniteInternalFuture<?> tryRollbackAsync() { + @Nullable public IgniteInternalFuture<?> tryRollbackAsync() { while (true) { final IgniteInternalFuture fut = lockFut; @@ -937,6 +940,52 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { return prepFut; } + /** + * @param node Backup node. + * @return Partition counters map for the given backup node. + */ + public Map<Integer, PartitionUpdateCounters> filterUpdateCountersForBackupNode(ClusterNode node) { + TxCounters txCntrs = txCounters(false); + + if (txCntrs == null) + return null; + + Map<Integer, PartitionUpdateCounters> updCntrs = txCntrs.updateCounters(); + + Map<Integer, PartitionUpdateCounters> res = new HashMap<>(); + + AffinityTopologyVersion top = topologyVersionSnapshot(); + + for (Map.Entry<Integer, PartitionUpdateCounters> entry : updCntrs.entrySet()) { + Integer cacheId = entry.getKey(); + + Map<Integer, Long> partsCntrs = entry.getValue().updateCounters(); + + assert !F.isEmpty(partsCntrs); + + GridCacheAffinityManager affinity = cctx.cacheContext(cacheId).affinity(); + + Map<Integer, Long> resCntrs = new HashMap<>(partsCntrs.size()); + + for (Map.Entry<Integer, Long> e : partsCntrs.entrySet()) { + Integer p = e.getKey(); + + Long cntr = e.getValue(); + + if (affinity.backupByPartition(node, p, top)) { + assert cntr != null && cntr > 0 : cntr; + + resCntrs.put(p, cntr); + } + } + + if (!resCntrs.isEmpty()) + res.put(cacheId, new PartitionUpdateCounters(resCntrs)); + } + + return res; + } + /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(), http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index 08ecf28..9a1763b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -79,8 +79,6 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { /** Store write through flag. */ private boolean storeWriteThrough; - /** Map of update counters made by this tx. Mapping: cacheId -> partId -> updCntr. */ - private Map<Integer, GridDhtPartitionsUpdateCountersMap> updCntrs; /** * Empty constructor required for {@link Externalizable}. */ @@ -503,16 +501,6 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { } /** {@inheritDoc} */ - @Override public void updateCountersMap(Map<Integer, GridDhtPartitionsUpdateCountersMap> updCntrsMap) { - this.updCntrs = updCntrsMap; - } - - /** {@inheritDoc} */ - @Override public Map<Integer, GridDhtPartitionsUpdateCountersMap> updateCountersMap() { - return updCntrs; - } - - /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(GridDhtTxRemote.class, this, "super", super.toString()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java new file mode 100644 index 0000000..5b1eccd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.ignite.internal.GridDirectMap; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Partition update counters message. + */ +public class PartitionUpdateCounters implements Message { + /** */ + private static final long serialVersionUID = 193442457510062844L; + + /** Map of update counters made by this tx. Mapping: partId -> updCntr. */ + @GridDirectMap(keyType = Integer.class, valueType = Long.class) + private Map<Integer, Long> updCntrs; + + /** */ + public PartitionUpdateCounters() { + // No-op. + } + + /** + * @param updCntrs Update counters map. + */ + public PartitionUpdateCounters(Map<Integer, Long> updCntrs) { + this.updCntrs = updCntrs; + } + + /** + * @return Update counters. + */ + public Map<Integer, Long> updateCounters() { + return updCntrs; + } + + /** + * @param updCntrs Update counters. + */ + public void updateCounters(Map<Integer, Long> updCntrs) { + this.updCntrs = updCntrs; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMap("updCntrs", updCntrs, MessageCollectionItemType.INT, MessageCollectionItemType.LONG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + updCntrs = reader.readMap("updCntrs", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(PartitionUpdateCounters.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 157; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 199efcb..04476ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -1563,6 +1563,19 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ + @Override public void updateSize(int cacheId, long delta) { + try { + CacheDataStore delegate0 = init0(false); + + if (delegate0 != null) + delegate0.updateSize(cacheId, delta); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ @Override public long updateCounter() { try { CacheDataStore delegate0 = init0(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 4acf078..05ebe5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -650,4 +650,10 @@ public interface IgniteInternalTx { * @return Mvcc snapshot. */ public MvccSnapshot mvccSnapshot(); -} \ No newline at end of file + + /** + * @return Transaction counters. + * @param createIfAbsent {@code True} if non-null instance is needed. + */ + @Nullable public TxCounters txCounters(boolean createIfAbsent); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index ee5a58e..c6d1991 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.expiry.ExpiryPolicy; @@ -62,10 +63,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -136,6 +139,10 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement private static final AtomicReferenceFieldUpdater<IgniteTxAdapter, FinalizationStatus> FINALIZING_UPD = AtomicReferenceFieldUpdater.newUpdater(IgniteTxAdapter.class, FinalizationStatus.class, "finalizing"); + /** */ + private static final AtomicReferenceFieldUpdater<IgniteTxAdapter, TxCounters> TX_COUNTERS_UPD = + AtomicReferenceFieldUpdater.newUpdater(IgniteTxAdapter.class, TxCounters.class, "txCounters"); + /** Logger. */ protected static IgniteLogger log; @@ -275,6 +282,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement @GridToStringExclude private volatile IgniteInternalFuture rollbackFut; + /** */ + private volatile TxCounters txCounters = new TxCounters(); + /** * Empty constructor required for {@link Externalizable}. */ @@ -2015,6 +2025,46 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement public abstract void addActiveCache(GridCacheContext cacheCtx, boolean recovery) throws IgniteCheckedException; /** {@inheritDoc} */ + @Nullable @Override public TxCounters txCounters(boolean createIfAbsent) { + if (createIfAbsent && txCounters == null) + TX_COUNTERS_UPD.compareAndSet(this, null, new TxCounters()); + + return txCounters; + } + + /** + * Make counters accumulated during transaction visible outside of transaciton. + */ + protected void applyTxCounters() { + TxCounters txCntrs = txCounters(false); + + if (txCntrs == null) + return; + + Map<Integer, ? extends Map<Integer, AtomicLong>> sizeDeltas = txCntrs.sizeDeltas(); + + for (Map.Entry<Integer, ? extends Map<Integer, AtomicLong>> entry : sizeDeltas.entrySet()) { + Integer cacheId = entry.getKey(); + Map<Integer, AtomicLong> partDeltas = entry.getValue(); + + assert !F.isEmpty(partDeltas); + + GridDhtPartitionTopology top = cctx.cacheContext(cacheId).topology(); + + for (Map.Entry<Integer, AtomicLong> e : partDeltas.entrySet()) { + Integer p = e.getKey(); + long delta = e.getValue().get(); + + GridDhtLocalPartition dhtPart = top.localPartition(p); + + assert dhtPart != null; + + dhtPart.dataStore().updateSize(cacheId, delta); + } + } + } + + /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(IgniteTxAdapter.class, this, "duration", (U.currentTimeMillis() - startTime) + "ms", @@ -2287,6 +2337,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ + @Nullable @Override public TxCounters txCounters(boolean createIfAbsent) { + return null; + } + + /** {@inheritDoc} */ @Override public IgniteTxState txState() { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 7541b43..32f4dd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -1391,7 +1391,7 @@ public class IgniteTxHandler { tx.setPartitionUpdateCounters( req.partUpdateCounters() != null ? req.partUpdateCounters().array() : null); - tx.updateCountersMap(req.updateCountersMap()); + tx.txCounters(true).updateCounters(req.updateCounters()); tx.commitRemoteTx(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index b86273f..bfe67ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.expiry.Duration; @@ -32,7 +31,6 @@ import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.IgniteInternalFuture; @@ -59,7 +57,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsUpdateCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCounters; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.persistence.StorageException; @@ -162,9 +160,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig /** */ private GridLongList mvccWaitTxs; - /** Update counters map */ - private Map<Integer, Map<Integer, Long>> updCntrs; - /** */ private volatile boolean qryEnlisted; @@ -918,7 +913,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } } - updateLocalPartitionCounters(); + applyTxCounters(); if (ptr != null && !cctx.tm().logTxRecords()) cctx.wal().flush(ptr, false); @@ -1563,6 +1558,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } /** {@inheritDoc} */ + @Override public void touchPartition(int cacheId, int partId) { + txState.touchPartition(cacheId, partId); + } + + /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(IgniteTxLocalAdapter.class, this, "super", super.toString(), "size", allEntries().size()); @@ -1644,74 +1644,28 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } /** - * @return Partition counters map for the given backup node. + * Merges mvcc update counters to the partition update counters. For mvcc transactions we update partitions + * counters only on commit phase. */ - public Map<Integer, GridDhtPartitionsUpdateCountersMap> updateCountersForNode(ClusterNode node) { - if (F.isEmpty(updCntrs)) + private Map<Integer, PartitionUpdateCounters> applyAndCollectLocalUpdateCounters() { + if (F.isEmpty(txState.touchedPartitions())) return null; - Map<Integer, GridDhtPartitionsUpdateCountersMap> res = new HashMap<>(); + HashMap<Integer, PartitionUpdateCounters> updCntrs = new HashMap<>(); - for (Map.Entry<Integer, Map<Integer, Long>> entry : updCntrs.entrySet()) { - Map<Integer, Long> partsCntrs = entry.getValue(); + for (Map.Entry<Integer, Set<Integer>> entry : txState.touchedPartitions().entrySet()) { + Integer cacheId = entry.getKey(); - assert !F.isEmpty(partsCntrs); + Set<Integer> parts = entry.getValue(); - GridCacheContext ctx0 = cctx.cacheContext(entry.getKey()); + assert !F.isEmpty(parts); - GridDhtPartitionsUpdateCountersMap resBackupCntrs = new GridDhtPartitionsUpdateCountersMap(); + GridCacheContext ctx0 = cctx.cacheContext(cacheId); - for (Map.Entry<Integer, Long> e : partsCntrs.entrySet()) { - Long cntr = partsCntrs.get(e.getKey()); + Map<Integer, Long> partCntrs = new HashMap<>(parts.size()); - if (ctx0.affinity().backupByPartition(node, e.getKey(), topologyVersionSnapshot())) { - assert cntr != null && cntr > 0 : cntr; - - resBackupCntrs.updateCounters().put(e.getKey(), cntr); - } - } - - if (!resBackupCntrs.updateCounters().isEmpty()) - res.put(entry.getKey(), resBackupCntrs); - } - - return res; - } - - /** - * @param cacheId Cache id. - * @param part Partition id. - */ - @SuppressWarnings("Java8MapApi") - public void addPartitionCountersMapping(Integer cacheId, Integer part) { - if (updCntrs == null) - updCntrs = new ConcurrentHashMap<>(); - - Map<Integer, Long> partUpdCntrs = updCntrs.get(cacheId); - - if (partUpdCntrs == null) - updCntrs.put(cacheId, partUpdCntrs = new ConcurrentHashMap<>()); - - partUpdCntrs.put(part, 0L); - } - - /** - * Merges mvcc update counters to the partition update counters. For mvcc transactions we update partitions - * counters only on commit phase. - */ - private void updateLocalPartitionCounters() { - if (F.isEmpty(updCntrs)) - return; - - for (Map.Entry<Integer, Map<Integer, Long>> entry : updCntrs.entrySet()) { - Map<Integer, Long> partsCntrs = entry.getValue(); - - assert !F.isEmpty(partsCntrs); - - GridCacheContext ctx0 = cctx.cacheContext(entry.getKey()); - - for (Map.Entry<Integer, Long> e : partsCntrs.entrySet()) { - GridDhtLocalPartition dhtPart = ctx0.topology().localPartition(e.getKey()); + for (Integer p : parts) { + GridDhtLocalPartition dhtPart = ctx0.topology().localPartition(p); assert dhtPart != null; @@ -1719,11 +1673,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig dhtPart.updateCounter(cntr); - Long prev = partsCntrs.put(e.getKey(), cntr); - - assert prev == 0L : prev; + partCntrs.put(p, cntr); } + + updCntrs.put(cacheId, new PartitionUpdateCounters(partCntrs)); } + + return updCntrs; } /** @@ -1747,6 +1703,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } } + /** {@inheritDoc} */ + @Override protected void applyTxCounters() { + super.applyTxCounters(); + + Map<Integer, PartitionUpdateCounters> updCntrs = applyAndCollectLocalUpdateCounters(); + + // remember counters for subsequent sending to backups + txCounters(true).updateCounters(updCntrs); + } + /** * Post-lock closure alias. * http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index b61b1a9..651be60 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -56,4 +56,12 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { * @throws IgniteCheckedException If finish failed. */ public boolean localFinish(boolean commit, boolean clearThreadMap) throws IgniteCheckedException; + + /** + * Remembers that particular cache partition was touched by current tx. + * + * @param cacheId Cache id. + * @param partId Partition id. + */ + public void touchPartition(int cacheId, int partId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java index 123d396..01eb4f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.processors.cache.transactions; +import java.util.Map; +import java.util.Set; + /** * */ @@ -41,4 +44,17 @@ public interface IgniteTxLocalState extends IgniteTxState { * */ public void seal(); + + /** + * @return Cache partitions touched by current tx. + */ + public Map<Integer, Set<Integer>> touchedPartitions(); + + /** + * Remembers that particular cache partition was touched by current tx. + * + * @param cacheId Cache id. + * @param partId Partition id. + */ + public void touchPartition(int cacheId, int partId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java index 4943aac..9c6ef8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java @@ -17,7 +17,13 @@ package org.apache.ignite.internal.processors.cache.transactions; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.util.typedef.internal.U; @@ -25,6 +31,11 @@ import org.apache.ignite.internal.util.typedef.internal.U; * */ public abstract class IgniteTxLocalStateAdapter implements IgniteTxLocalState { + /** */ + private static final Function<Integer, Set<Integer>> CREATE_INT_SET = k -> new HashSet<>(); + /** */ + private Map<Integer, Set<Integer>> touchedParts; + /** * @param cacheCtx Cache context. * @param tx Transaction. @@ -40,4 +51,19 @@ public abstract class IgniteTxLocalStateAdapter implements IgniteTxLocalState { cacheCtx.cache().metrics0().onTxRollback(durationNanos); } } + + /** {@inheritDoc} */ + @Override public Map<Integer, Set<Integer>> touchedPartitions() { + Map<Integer, Set<Integer>> parts = touchedParts; + + return parts != null ? Collections.unmodifiableMap(parts) : null; + } + + /** {@inheritDoc} */ + @Override public void touchPartition(int cacheId, int partId) { + if (touchedParts == null) + touchedParts = new HashMap<>(); + + touchedParts.computeIfAbsent(cacheId, CREATE_INT_SET).add(partId); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java index 1e0645f..87cc7cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java @@ -18,9 +18,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.Collection; -import java.util.Map; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsUpdateCountersMap; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; /** @@ -52,14 +50,4 @@ public interface IgniteTxRemoteEx extends IgniteInternalTx { * @param cntrs Partition update indexes. */ public void setPartitionUpdateCounters(long[] cntrs); - - /** - * @param updCntrsMap Partition update counters map: cacheId -> partId -> updateCntr. - */ - public void updateCountersMap(Map<Integer, GridDhtPartitionsUpdateCountersMap> updCntrsMap); - - /** - * @return Partition update counters map: cacheId -> partId -> updateCntr. - */ - public Map<Integer, GridDhtPartitionsUpdateCountersMap> updateCountersMap(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java new file mode 100644 index 0000000..2ad4f94 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCounters; + +/** + * Values which should be tracked during transaction execution and applied on commit. + */ +public class TxCounters { + /** Size changes for cache partitions made by transaction */ + private final ConcurrentMap<Integer, ConcurrentMap<Integer, AtomicLong>> sizeDeltas = new ConcurrentHashMap<>(); + /** Update counters for cache partitions in the end of transaction */ + private Map<Integer, PartitionUpdateCounters> updCntrs; + + /** + * Accumulates size change for cache partition. + * + * @param cacheId Cache id. + * @param part Partition id. + * @param delta Size delta. + */ + public void accumulateSizeDelta(int cacheId, int part, long delta) { + ConcurrentMap<Integer, AtomicLong> partDeltas = sizeDeltas.get(cacheId); + + if (partDeltas == null) { + ConcurrentMap<Integer, AtomicLong> partDeltas0 = + sizeDeltas.putIfAbsent(cacheId, partDeltas = new ConcurrentHashMap<>()); + + if (partDeltas0 != null) + partDeltas = partDeltas0; + } + + AtomicLong accDelta = partDeltas.get(part); + + if (accDelta == null) { + AtomicLong accDelta0 = partDeltas.putIfAbsent(part, accDelta = new AtomicLong()); + + if (accDelta0 != null) + accDelta = accDelta0; + } + + // here AtomicLong is used more as a container, + // every instance is assumed to be accessed in thread-confined manner + accDelta.set(accDelta.get() + delta); + } + + /** */ + public void updateCounters(Map<Integer, PartitionUpdateCounters> updCntrs) { + this.updCntrs = updCntrs; + } + + /** */ + public Map<Integer, PartitionUpdateCounters> updateCounters() { + return updCntrs != null ? Collections.unmodifiableMap(updCntrs) : Collections.emptyMap(); + } + + /** */ + public Map<Integer, ? extends Map<Integer, AtomicLong>> sizeDeltas() { + return Collections.unmodifiableMap(sizeDeltas); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java new file mode 100644 index 0000000..32709ff --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.stream.IntStream; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.query.IgniteSQLException; + +import static org.apache.ignite.cache.CachePeekMode.BACKUP; + +/** + * + */ +public class CacheMvccSizeTest extends CacheMvccAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** */ + private void checkSizeModificationByOperation(String sql, boolean commit, int expSizeDelta) throws Exception { + checkSizeModificationByOperation(c -> {}, cache -> cache.query(q(sql)).getAll(), commit, expSizeDelta); + } + + /** */ + private void checkSizeModificationByOperation(String initSql, String sql, boolean commit, + int expSizeDelta) throws Exception { + checkSizeModificationByOperation( + cache -> cache.query(q(initSql)).getAll(), + cache -> cache.query(q(sql)).getAll(), + commit, + expSizeDelta); + } + + /** */ + private void checkSizeModificationByOperation(Consumer<IgniteCache<?, ?>> inTx, boolean commit, + int expSizeDelta) throws Exception { + checkSizeModificationByOperation(c -> {}, inTx, commit, expSizeDelta); + } + + /** */ + private void checkSizeModificationByOperation(Consumer<IgniteCache<?, ?>> beforeTx, + Consumer<IgniteCache<?, ?>> inTx, boolean commit, int expSizeDelta) throws Exception { + IgniteCache<Object, Object> tbl0 = grid(0).cache("person"); + + tbl0.query(q("delete from person")); + + beforeTx.accept(tbl0); + + int initSize = tbl0.size(); + + tbl0.query(q("begin")); + + inTx.accept(tbl0); + + // size is not changed before commit + assertEquals(0, tbl0.size() - initSize); + + if (commit) + tbl0.query(q("commit")); + else + tbl0.query(q("rollback")); + + assertEquals(expSizeDelta, tbl0.size() - initSize); + assertEquals(tbl0.size(), table(grid(1)).size()); + + assertEquals(tbl0.size(), tbl0.size(BACKUP)); + assertEquals(tbl0.size(), table(grid(1)).size(BACKUP)); + } + + /** + * @throws Exception if failed. + */ + public void testSql() throws Exception { + startGridsMultiThreaded(2); + + createTable(grid(0)); + + checkSizeModificationByOperation("insert into person values(1, 'a')", true, 1); + + checkSizeModificationByOperation("insert into person values(1, 'a')", false, 0); + + checkSizeModificationByOperation( + personTbl -> personTbl.query(q("insert into person values(1, 'a')")), + personTbl -> { + try { + personTbl.query(q("insert into person values(1, 'a')")); + } + catch (Exception e) { + if (e.getCause() instanceof IgniteSQLException) { + assertEquals(IgniteQueryErrorCode.DUPLICATE_KEY, + ((IgniteSQLException)e.getCause()).statusCode()); + } + else { + e.printStackTrace(); + + fail("Unexpected exceptions"); + } + } + }, + true, 0); + + checkSizeModificationByOperation("merge into person(id, name) values(1, 'a')", true, 1); + + checkSizeModificationByOperation("merge into person(id, name) values(1, 'a')", false, 0); + + checkSizeModificationByOperation( + "insert into person values(1, 'a')", "merge into person(id, name) values(1, 'b')", true, 0); + + checkSizeModificationByOperation("update person set name = 'b' where id = 1", true, 0); + + checkSizeModificationByOperation( + "insert into person values(1, 'a')", "update person set name = 'b' where id = 1", true, 0); + + checkSizeModificationByOperation( + "insert into person values(1, 'a')", "delete from person where id = 1", true, -1); + + checkSizeModificationByOperation( + "insert into person values(1, 'a')", "delete from person where id = 1", false, 0); + + checkSizeModificationByOperation("delete from person where id = 1", true, 0); + + checkSizeModificationByOperation( + "insert into person values(1, 'a')", "select * from person", true, 0); + + checkSizeModificationByOperation("select * from person", true, 0); + + checkSizeModificationByOperation( + "insert into person values(1, 'a')", "select * from person where id = 1 for update", true, 0); + + checkSizeModificationByOperation("select * from person where id = 1 for update", true, 0); + + checkSizeModificationByOperation(personTbl -> { + personTbl.query(q("insert into person values(1, 'a')")); + + personTbl.query(q("insert into person values(%d, 'b')", keyInSamePartition(grid(0), "person", 1))); + + personTbl.query(q("insert into person values(%d, 'c')", keyInDifferentPartition(grid(0), "person", 1))); + }, true, 3); + + checkSizeModificationByOperation(personTbl -> { + personTbl.query(q("insert into person values(1, 'a')")); + + personTbl.query(q("delete from person where id = 1")); + }, true, 0); + + checkSizeModificationByOperation(personTbl -> { + personTbl.query(q("insert into person values(1, 'a')")); + + personTbl.query(q("delete from person where id = 1")); + + personTbl.query(q("insert into person values(1, 'a')")); + }, true, 1); + + checkSizeModificationByOperation( + personTbl -> personTbl.query(q("insert into person values(1, 'a')")), + personTbl -> { + personTbl.query(q("delete from person where id = 1")); + + personTbl.query(q("insert into person values(1, 'a')")); + }, true, 0); + + checkSizeModificationByOperation(personTbl -> { + personTbl.query(q("merge into person(id, name) values(1, 'a')")); + + personTbl.query(q("delete from person where id = 1")); + }, true, 0); + + checkSizeModificationByOperation(personTbl -> { + personTbl.query(q("merge into person(id, name) values(1, 'a')")); + + personTbl.query(q("delete from person where id = 1")); + + personTbl.query(q("merge into person(id, name) values(1, 'a')")); + }, true, 1); + + checkSizeModificationByOperation( + personTbl -> personTbl.query(q("merge into person(id, name) values(1, 'a')")), + personTbl -> { + personTbl.query(q("delete from person where id = 1")); + + personTbl.query(q("merge into person(id, name) values(1, 'a')")); + }, true, 0); + } + + /** + * @throws Exception if failed. + */ + public void testInsertDeleteConcurrent() throws Exception { + startGridsMultiThreaded(2); + + IgniteCache<?, ?> tbl0 = createTable(grid(0)); + + SqlFieldsQuery insert = new SqlFieldsQuery("insert into person(id, name) values(?, 'a')"); + + SqlFieldsQuery delete = new SqlFieldsQuery("delete from person where id = ?"); + + CompletableFuture<Integer> insertFut = CompletableFuture.supplyAsync(() -> { + int cnt = 0; + + for (int i = 0; i < 1000; i++) + cnt += update(insert.setArgs(ThreadLocalRandom.current().nextInt(10)), tbl0); + + return cnt; + }); + + CompletableFuture<Integer> deleteFut = CompletableFuture.supplyAsync(() -> { + int cnt = 0; + + for (int i = 0; i < 1000; i++) + cnt += update(delete.setArgs(ThreadLocalRandom.current().nextInt(10)), tbl0); + + return cnt; + }); + + int expSize = insertFut.join() - deleteFut.join(); + + assertEquals(expSize, tbl0.size()); + assertEquals(expSize, table(grid(1)).size()); + + assertEquals(expSize, tbl0.size(BACKUP)); + assertEquals(expSize, table(grid(1)).size(BACKUP)); + } + + /** */ + private int update(SqlFieldsQuery qry, IgniteCache<?, ?> cache) { + try { + return Integer.parseInt(cache.query(qry).getAll().get(0).get(0).toString()); + } catch (Exception e) { + return 0; + } + } + + /** + * @throws Exception if failed. + */ + public void testWriteConflictDoesNotChangeSize() throws Exception { + startGridsMultiThreaded(2); + + IgniteCache<?, ?> tbl0 = createTable(grid(0)); + + tbl0.query(q("insert into person values(1, 'a')")); + + tbl0.query(q("begin")); + + tbl0.query(q("delete from person where id = 1")); + + CompletableFuture<Void> conflictingStarted = new CompletableFuture<>(); + + CompletableFuture<Void> fut = CompletableFuture.runAsync(() -> { + tbl0.query(q("begin")); + + try { + tbl0.query(q("select * from person")).getAll(); + conflictingStarted.complete(null); + + tbl0.query(q("merge into person(id, name) values(1, 'b')")); + } + finally { + tbl0.query(q("commit")); + } + }); + + conflictingStarted.join(); + tbl0.query(q("commit")); + + try { + fut.join(); + } + catch (Exception e) { + if (e.getCause().getCause() instanceof IgniteSQLException) + assertTrue(e.getMessage().toLowerCase().contains("version mismatch")); + else { + e.printStackTrace(); + + fail("Unexpected exception"); + } + } + + assertEquals(0, tbl0.size()); + assertEquals(0, table(grid(1)).size()); + + assertEquals(0, tbl0.size(BACKUP)); + assertEquals(0, table(grid(1)).size(BACKUP)); + } + + /** + * @throws Exception if failed. + */ + public void testDeleteChangesSizeAfterUnlock() throws Exception { + startGridsMultiThreaded(2); + + IgniteCache<?, ?> tbl0 = createTable(grid(0)); + + tbl0.query(q("insert into person values(1, 'a')")); + + tbl0.query(q("begin")); + + tbl0.query(q("select * from person where id = 1 for update")).getAll(); + + CompletableFuture<Thread> asyncThread = new CompletableFuture<>(); + + CompletableFuture<Void> fut = CompletableFuture.runAsync(() -> { + tbl0.query(q("begin")); + + try { + tbl0.query(q("select * from person")).getAll(); + + asyncThread.complete(Thread.currentThread()); + tbl0.query(q("delete from person where id = 1")); + } + finally { + tbl0.query(q("commit")); + } + }); + + Thread concThread = asyncThread.join(); + + // wait until concurrent thread blocks awaiting entry mvcc lock release + while (concThread.getState() == Thread.State.RUNNABLE && !Thread.currentThread().isInterrupted()); + + tbl0.query(q("commit")); + + fut.join(); + + assertEquals(0, tbl0.size()); + assertEquals(0, table(grid(1)).size()); + + assertEquals(0, tbl0.size(BACKUP)); + assertEquals(0, table(grid(1)).size(BACKUP)); + } + + /** + * @throws Exception if failed. + */ + public void testDataStreamerModifiesReplicatedCacheSize() throws Exception { + startGridsMultiThreaded(2); + + IgniteEx ignite = grid(0); + + ignite.createCache( + new CacheConfiguration<>("test") + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setCacheMode(CacheMode.REPLICATED) + ); + + try (IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer("test")) { + streamer.addData(1, "a"); + + streamer.addData(keyInDifferentPartition(ignite, "test", 1), "b"); + } + + assertEquals(2, ignite.cache("test").size()); + + assertEquals(1, grid(0).cache("test").localSize()); + assertEquals(1, grid(0).cache("test").localSize(BACKUP)); + + assertEquals(1, grid(1).cache("test").localSize()); + assertEquals(1, grid(1).cache("test").localSize(BACKUP)); + } + + /** + * @throws Exception if failed. + */ + public void testSizeIsConsistentAfterRebalance() throws Exception { + IgniteEx ignite = startGrid(0); + + IgniteCache<?, ?> tbl = createTable(ignite); + + for (int i = 0; i < 100; i++) + tbl.query(q("insert into person values(?, ?)").setArgs(i, i)); + + startGrid(1); + + awaitPartitionMapExchange(); + + IgniteCache<?, ?> tbl0 = grid(0).cache("person"); + IgniteCache<?, ?> tbl1 = grid(1).cache("person"); + + assert tbl0.localSize() != 0 && tbl1.localSize() != 0; + + assertEquals(100, tbl1.size()); + assertEquals(100, tbl0.localSize() + tbl1.localSize()); + } + + /** + * @throws Exception If failed. + */ + public void testSizeIsConsistentAfterRebalanceDuringInsert() throws Exception { + IgniteEx ignite = startGrid(0); + + IgniteCache<?, ?> tbl = createTable(ignite); + + Future<?> f = null; + + for (int i = 0; i < 100; i++) { + if (i == 50) + f = ForkJoinPool.commonPool().submit(() -> startGrid(1)); + + tbl.query(q("insert into person values(?, ?)").setArgs(i, i)); + } + + f.get(); + + awaitPartitionMapExchange(); + + IgniteCache<?, ?> tbl0 = grid(0).cache("person"); + IgniteCache<?, ?> tbl1 = grid(1).cache("person"); + + assert tbl0.localSize() != 0 && tbl1.localSize() != 0; + + assertEquals(100, tbl1.size()); + assertEquals(100, tbl0.localSize() + tbl1.localSize()); + } + + /** */ + private static IgniteCache<?, ?> table(IgniteEx ignite) { + assert ignite.cachex("person").configuration().getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL; + assert ignite.cachex("person").configuration().getCacheMode() == CacheMode.REPLICATED; + + return ignite.cache("person"); + } + + /** */ + private static IgniteCache<?, ?> createTable(IgniteEx ignite) { + IgniteCache<?, ?> sqlNexus = ignite.getOrCreateCache(new CacheConfiguration<>("sqlNexus").setSqlSchema("PUBLIC")); + + sqlNexus.query(q("" + + "create table person(" + + " id int primary key," + + " name varchar" + + ") with \"atomicity=transactional,template=replicated,cache_name=person\"")); + + return table(ignite); + } + + /** */ + private static SqlFieldsQuery q(String fSql, Object... args) { + return new SqlFieldsQuery(String.format(fSql, args)); + } + + /** */ + private static int keyInSamePartition(Ignite ignite, String cacheName, int key) { + Affinity<Object> affinity = ignite.affinity(cacheName); + + return IntStream.iterate(key + 1, i -> i + 1) + .filter(i -> affinity.partition(i) == affinity.partition(key)) + .findFirst().getAsInt(); + } + + /** */ + private static int keyInDifferentPartition(Ignite ignite, String cacheName, int key) { + Affinity<Object> affinity = ignite.affinity(cacheName); + + return IntStream.iterate(key + 1, i -> i + 1) + .filter(i -> affinity.partition(i) != affinity.partition(key)) + .findFirst().getAsInt(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java index ab9d2e6..c8f7643 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccReplicatedSqlQu import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccReplicatedSqlTxQueriesTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccReplicatedSqlTxQueriesWithReducerTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccScanQueryWithConcurrentJdbcTransactionTest; +import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeWithConcurrentJdbcTransactionTest; import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccStreamingInsertTest; import org.apache.ignite.internal.processors.query.h2.GridIndexRebuildWithMvccEnabledSelfTest; @@ -50,9 +51,10 @@ public class IgniteCacheMvccSqlTestSuite extends TestSuite { public static TestSuite suite() { TestSuite suite = new TestSuite("IgniteCache SQL MVCC Test Suite"); - // Simle tests. + // Simple tests. suite.addTestSuite(CacheMvccDmlSimpleTest.class); suite.addTestSuite(SqlTransactionsCommandsWithMvccEnabledSelfTest.class); + suite.addTestSuite(CacheMvccSizeTest.class); suite.addTestSuite(GridIndexRebuildWithMvccEnabledSelfTest.class); @@ -83,7 +85,6 @@ public class IgniteCacheMvccSqlTestSuite extends TestSuite { suite.addTestSuite(CacheMvccPartitionedSqlCoordinatorFailoverTest.class); suite.addTestSuite(CacheMvccReplicatedSqlCoordinatorFailoverTest.class); - return suite; } }