IGNITE-8149: MVCC: correct processing of cache size. This closes #4654.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/00e935d3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/00e935d3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/00e935d3

Branch: refs/heads/ignite-5960
Commit: 00e935d33cd852cdc2a460f24ca0f0136ac5c59e
Parents: 40432b0
Author: ipavlukhin <vololo...@gmail.com>
Authored: Fri Sep 7 16:00:02 2018 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Fri Sep 7 16:00:02 2018 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   4 +-
 .../processors/cache/GridCacheMapEntry.java     |  12 +
 .../cache/IgniteCacheOffheapManager.java        |   8 +
 .../cache/IgniteCacheOffheapManagerImpl.java    |  54 +-
 .../GridDistributedTxRemoteAdapter.java         |  51 +-
 .../dht/GridDhtPartitionsUpdateCountersMap.java | 119 -----
 .../dht/GridDhtTxAbstractEnlistFuture.java      |   2 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |   6 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |  14 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  51 +-
 .../cache/distributed/dht/GridDhtTxRemote.java  |  12 -
 .../dht/PartitionUpdateCounters.java            | 123 +++++
 .../persistence/GridCacheOffheapManager.java    |  13 +
 .../cache/transactions/IgniteInternalTx.java    |   8 +-
 .../cache/transactions/IgniteTxAdapter.java     |  57 ++-
 .../cache/transactions/IgniteTxHandler.java     |   2 +-
 .../transactions/IgniteTxLocalAdapter.java      | 104 ++--
 .../cache/transactions/IgniteTxLocalEx.java     |   8 +
 .../cache/transactions/IgniteTxLocalState.java  |  16 +
 .../transactions/IgniteTxLocalStateAdapter.java |  26 +
 .../cache/transactions/IgniteTxRemoteEx.java    |  12 -
 .../cache/transactions/TxCounters.java          |  82 ++++
 .../cache/mvcc/CacheMvccSizeTest.java           | 488 +++++++++++++++++++
 .../testsuites/IgniteCacheMvccSqlTestSuite.java |   5 +-
 24 files changed, 990 insertions(+), 287 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 2970e71..14f0548 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -70,7 +70,6 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsUpdateCountersMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
@@ -80,6 +79,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQuer
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryFirstEnlistRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCounters;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
@@ -1062,7 +1062,7 @@ public class GridIoMessageFactory implements 
MessageFactory {
                 break;
 
             case 157:
-                msg = new GridDhtPartitionsUpdateCountersMap();
+                msg = new PartitionUpdateCounters();
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 8c2b939..714d4a0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1122,6 +1122,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (tx.local())
                 updateCntr = nextMvccPartitionCounter();
 
+            if (res.resultType() == ResultType.PREV_NULL)
+                tx.txCounters(true).accumulateSizeDelta(cctx.cacheId(), 
partition(), 1);
+
             if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
                 logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
                         cctx.cacheId(),
@@ -1217,6 +1220,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (tx.local())
                 updateCntr = nextMvccPartitionCounter();
 
+            if (res.resultType() == ResultType.PREV_NOT_NULL)
+                tx.txCounters(true).accumulateSizeDelta(cctx.cacheId(), 
partition(), -1);
+
             if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
                 logPtr = logTxUpdate(tx, null, 0, updateCntr);
 
@@ -4994,6 +5000,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 if (updateCntr != null && updateCntr != 0)
                     updateCntr0 = updateCntr;
 
+                if (res.resultType() == ResultType.PREV_NOT_NULL)
+                    tx.txCounters(true).accumulateSizeDelta(cctx.cacheId(), 
entry.partition(), -1);
+
                 if (cctx.group().persistenceEnabled() && 
cctx.group().walEnabled())
                     logPtr = cctx.shared().wal().log(new DataRecord(new 
DataEntry(
                             cctx.cacheId(),
@@ -5281,6 +5290,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
                 updateCntr0 = tx.local() ? entry.nextMvccPartitionCounter() : 
updateCntr;
 
+                if (res.resultType() == ResultType.PREV_NULL)
+                    tx.txCounters(true).accumulateSizeDelta(cctx.cacheId(), 
entry.partition(), 1);
+
                 if (cctx.group().persistenceEnabled() && 
cctx.group().walEnabled())
                     logPtr = cctx.shared().wal().log(new DataRecord(new 
DataEntry(
                             cctx.cacheId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index a021394..fdf42fe 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -620,6 +620,14 @@ public interface IgniteCacheOffheapManager {
         long fullSize();
 
         /**
+         * Updates size metric for particular cache.
+         *
+         * @param cacheId Cache ID.
+         * @param delta Size delta.
+         */
+        void updateSize(int cacheId, long delta);
+
+        /**
          * @return Update counter.
          */
         long updateCounter();

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 13ad7e2..8811006 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1450,36 +1450,14 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
          * @param cacheId Cache ID.
          */
         void incrementSize(int cacheId) {
-            storageSize.incrementAndGet();
-
-            if (grp.sharedGroup()) {
-                AtomicLong size = cacheSizes.get(cacheId);
-
-                if (size == null) {
-                    AtomicLong old = cacheSizes.putIfAbsent(cacheId, size = 
new AtomicLong());
-
-                    if (old != null)
-                        size = old;
-                }
-
-                size.incrementAndGet();
-            }
+            updateSize(cacheId, 1);
         }
 
         /**
          * @param cacheId Cache ID.
          */
         void decrementSize(int cacheId) {
-            storageSize.decrementAndGet();
-
-            if (grp.sharedGroup()) {
-                AtomicLong size = cacheSizes.get(cacheId);
-
-                if (size == null)
-                    return;
-
-                size.decrementAndGet();
-            }
+            updateSize(cacheId, -1);
         }
 
         /** {@inheritDoc} */
@@ -1517,6 +1495,24 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
+        @Override public void updateSize(int cacheId, long delta) {
+            storageSize.addAndGet(delta);
+
+            if (grp.sharedGroup()) {
+                AtomicLong size = cacheSizes.get(cacheId);
+
+                if (size == null) {
+                    AtomicLong old = cacheSizes.putIfAbsent(cacheId, size = 
new AtomicLong());
+
+                    if (old != null)
+                        size = old;
+                }
+
+                size.addAndGet(delta);
+            }
+        }
+
+        /** {@inheritDoc} */
         @Override public long nextUpdateCounter() {
             return cntr.incrementAndGet();
         }
@@ -1953,8 +1949,6 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 assert !old;
 
-                incrementSize(cctx.cacheId());
-
                 GridCacheQueryManager qryMgr = cctx.queries();
 
                 if (qryMgr.enabled())
@@ -2284,11 +2278,13 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 rowStore.removeRow(row.link());
 
-                decrementSize(cctx.cacheId());
-
                 if (first)
                     first = false;
             }
+
+            // first == true means there were no row versions
+            if (!first)
+                decrementSize(cctx.cacheId());
         }
 
         /** {@inheritDoc} */
@@ -2318,8 +2314,6 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                         rowStore.removeRow(cleanupRow.link());
 
-                        decrementSize(cctx.cacheId());
-
                         res++;
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index dd6ea48..8e96ae2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -51,7 +51,8 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsUpdateCountersMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCounters;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import 
org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -61,6 +62,7 @@ import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteEx;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteState;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState;
+import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import 
org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
@@ -807,7 +809,7 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
                                 }
                             }
 
-                            updateLocalCounters();
+                            applyTxCounters();
 
                             if (!near() && !F.isEmpty(dataEntries) && 
cctx.wal() != null) {
                                 // Set new update counters for data entries 
received from persisted tx entries.
@@ -845,29 +847,34 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
         }
     }
 
-    /**
-     * Applies update counters to the local partitions.
-     */
-    private void updateLocalCounters() {
-        Map<Integer, GridDhtPartitionsUpdateCountersMap> updCntrsMap = 
updateCountersMap();
+    /** {@inheritDoc} */
+    @Override protected void applyTxCounters() {
+        super.applyTxCounters();
+
+        TxCounters txCntrs = txCounters(false);
 
-        if (F.isEmpty(updCntrsMap))
+        if (txCntrs == null)
             return;
 
-        for (Map.Entry<Integer, GridDhtPartitionsUpdateCountersMap> entry : 
updCntrsMap.entrySet()) {
-            GridCacheContext cacheCtx = cctx.cacheContext(entry.getKey());
+        Map<Integer, PartitionUpdateCounters> updCntrs = 
txCntrs.updateCounters();
 
-            GridDhtPartitionsUpdateCountersMap cacheUpdCntrs = 
entry.getValue();
+        for (Map.Entry<Integer, PartitionUpdateCounters> entry : 
updCntrs.entrySet()) {
+            int cacheId = entry.getKey();
 
-            assert cacheUpdCntrs != null && 
!F.isEmpty(cacheUpdCntrs.updateCounters());
+            GridDhtPartitionTopology top = 
cctx.cacheContext(cacheId).topology();
 
-            for (Map.Entry<Integer, Long> e : 
cacheUpdCntrs.updateCounters().entrySet()) {
-                Long updCntr = e.getValue();
-                GridDhtLocalPartition part = 
cacheCtx.topology().localPartition(e.getKey());
+            Map<Integer, Long> cacheUpdCntrs = 
entry.getValue().updateCounters();
 
-                assert part != null && updCntr != null && updCntr > 0;
+            assert cacheUpdCntrs != null;
 
-                part.updateCounter(updCntr);
+            for (Map.Entry<Integer, Long> e : cacheUpdCntrs.entrySet()) {
+                long updCntr = e.getValue();
+
+                GridDhtLocalPartition dhtPart = top.localPartition(e.getKey());
+
+                assert dhtPart != null && updCntr > 0;
+
+                dhtPart.updateCounter(updCntr);
             }
         }
     }
@@ -1023,16 +1030,6 @@ public abstract class GridDistributedTxRemoteAdapter 
extends IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCountersMap(Map<Integer, 
GridDhtPartitionsUpdateCountersMap> updCntrsMap) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<Integer, GridDhtPartitionsUpdateCountersMap> 
updateCountersMap() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
     @Override public String toString() {
         return 
GridToStringBuilder.toString(GridDistributedTxRemoteAdapter.class, this, 
"super", super.toString());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsUpdateCountersMap.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsUpdateCountersMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsUpdateCountersMap.java
deleted file mode 100644
index 7a345d1..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsUpdateCountersMap.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.internal.GridDirectMap;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- * Partitions update counters message.
- */
-public class GridDhtPartitionsUpdateCountersMap implements Message {
-    /** */
-    private static final long serialVersionUID = -4599730112233297219L;
-
-    /** Map of update counters made by this tx. Mapping: partId -> updCntr. */
-    @GridDirectMap(keyType = Integer.class, valueType = Long.class)
-    private  Map<Integer, Long> updCntrs;
-
-    /**
-     *
-     */
-    public GridDhtPartitionsUpdateCountersMap() {
-        updCntrs = new HashMap<>();
-    }
-
-    /**
-     * @return Update counters.
-     */
-    public Map<Integer, Long> updateCounters() {
-        return updCntrs;
-    }
-
-    /**
-     * @param updCntrs Update counters.
-     */
-    public void updateCounters(Map<Integer, Long> updCntrs) {
-        this.updCntrs = updCntrs;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeMap("updCntrs", updCntrs, 
MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                updCntrs = reader.readMap("updCntrs", 
MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return 
reader.afterMessageRead(GridDhtPartitionsUpdateCountersMap.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 157;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 1;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index bb11df5..a3471c7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -593,7 +593,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends 
GridCacheFutureAdapt
 
         int part = cctx.affinity().partition(key);
 
-        tx.addPartitionCountersMapping(cacheId, part);
+        tx.touchPartition(cacheId, part);
 
         if (F.isEmpty(backups))
             return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index c0a3845..4c72e6f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -457,10 +457,10 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
             for (IgniteTxEntry e : dhtMapping.entries())
                 updCntrs.add(e.updateCounter());
 
-            Map<Integer, GridDhtPartitionsUpdateCountersMap> updCntrsMap = 
null;
+            Map<Integer, PartitionUpdateCounters> updCntrsForNode = null;
 
             if (dhtMapping.queryUpdate() && commit)
-                updCntrsMap = tx.updateCountersForNode(n);
+                updCntrsForNode = tx.filterUpdateCountersForBackupNode(n);
 
             GridDhtTxFinishRequest req = new GridDhtTxFinishRequest(
                 tx.nearNodeId(),
@@ -489,7 +489,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
                 false,
                 false,
                 mvccSnapshot,
-                updCntrsMap);
+                updCntrsForNode);
 
             req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : 
tx.xidVersion());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index b1908ff..8e9ece6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -73,9 +73,10 @@ public class GridDhtTxFinishRequest extends 
GridDistributedTxFinishRequest {
     /** */
     private MvccSnapshot mvccSnapshot;
 
-    /** Map of update counters made by this tx. Mapping: cacheId -> partId -> 
updCntr. */
-    @GridDirectMap(keyType = Integer.class, valueType = 
GridDhtPartitionsUpdateCountersMap.class)
-    private Map<Integer, GridDhtPartitionsUpdateCountersMap> updCntrs;
+    /** */
+    @GridDirectMap(keyType = Integer.class, valueType = 
PartitionUpdateCounters.class)
+    private Map<Integer, PartitionUpdateCounters> updCntrs;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -220,7 +221,7 @@ public class GridDhtTxFinishRequest extends 
GridDistributedTxFinishRequest {
         boolean retVal,
         boolean waitRemoteTxs,
         MvccSnapshot mvccSnapshot,
-        Map<Integer, GridDhtPartitionsUpdateCountersMap> updCntrs
+        Map<Integer, PartitionUpdateCounters> updCntrs
     ) {
         this(nearNodeId,
             futId,
@@ -362,10 +363,11 @@ public class GridDhtTxFinishRequest extends 
GridDistributedTxFinishRequest {
     public void needReturnValue(boolean retVal) {
         setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK);
     }
+
     /**
-     * @return Partition update counters map.
+     * @return Partition counters update deferred until transaction commit.
      */
-    public Map<Integer, GridDhtPartitionsUpdateCountersMap> 
updateCountersMap() {
+    public Map<Integer, PartitionUpdateCounters> updateCounters() {
         return updCntrs;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 11b46a0..613f160 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -20,6 +20,7 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht;
 import java.io.Externalizable;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +32,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -42,6 +44,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
+import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridLeanMap;
@@ -907,7 +910,7 @@ public abstract class GridDhtTxLocalAdapter extends 
IgniteTxLocalAdapter {
      *
      * @return Current lock future or null if it's safe to roll back.
      */
-    public @Nullable IgniteInternalFuture<?> tryRollbackAsync() {
+    @Nullable public IgniteInternalFuture<?> tryRollbackAsync() {
         while (true) {
             final IgniteInternalFuture fut = lockFut;
 
@@ -937,6 +940,52 @@ public abstract class GridDhtTxLocalAdapter extends 
IgniteTxLocalAdapter {
         return prepFut;
     }
 
+    /**
+     * @param node Backup node.
+     * @return Partition counters map for the given backup node.
+     */
+    public Map<Integer, PartitionUpdateCounters> 
filterUpdateCountersForBackupNode(ClusterNode node) {
+        TxCounters txCntrs = txCounters(false);
+
+        if (txCntrs == null)
+            return null;
+
+        Map<Integer, PartitionUpdateCounters> updCntrs = 
txCntrs.updateCounters();
+
+        Map<Integer, PartitionUpdateCounters> res = new HashMap<>();
+
+        AffinityTopologyVersion top = topologyVersionSnapshot();
+
+        for (Map.Entry<Integer, PartitionUpdateCounters> entry : 
updCntrs.entrySet()) {
+            Integer cacheId = entry.getKey();
+
+            Map<Integer, Long> partsCntrs = entry.getValue().updateCounters();
+
+            assert !F.isEmpty(partsCntrs);
+
+            GridCacheAffinityManager affinity = 
cctx.cacheContext(cacheId).affinity();
+
+            Map<Integer, Long> resCntrs = new HashMap<>(partsCntrs.size());
+
+            for (Map.Entry<Integer, Long> e : partsCntrs.entrySet()) {
+                Integer p = e.getKey();
+
+                Long cntr = e.getValue();
+
+                if (affinity.backupByPartition(node, p, top)) {
+                    assert cntr != null && cntr > 0 : cntr;
+
+                    resCntrs.put(p, cntr);
+                }
+            }
+
+            if (!resCntrs.isEmpty())
+                res.put(cacheId, new PartitionUpdateCounters(resCntrs));
+        }
+
+        return res;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, 
"nearNodes", nearMap.keySet(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 08ecf28..9a1763b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -79,8 +79,6 @@ public class GridDhtTxRemote extends 
GridDistributedTxRemoteAdapter {
     /** Store write through flag. */
     private boolean storeWriteThrough;
 
-    /** Map of update counters made by this tx. Mapping: cacheId -> partId -> 
updCntr. */
-    private Map<Integer, GridDhtPartitionsUpdateCountersMap> updCntrs;
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -503,16 +501,6 @@ public class GridDhtTxRemote extends 
GridDistributedTxRemoteAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCountersMap(Map<Integer, 
GridDhtPartitionsUpdateCountersMap> updCntrsMap) {
-       this.updCntrs = updCntrsMap;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<Integer, GridDhtPartitionsUpdateCountersMap> 
updateCountersMap() {
-        return updCntrs;
-    }
-
-    /** {@inheritDoc} */
     @Override public String toString() {
         return GridToStringBuilder.toString(GridDhtTxRemote.class, this, 
"super", super.toString());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java
new file mode 100644
index 0000000..5b1eccd
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCounters.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Partition update counters message.
+ */
+public class PartitionUpdateCounters implements Message {
+    /** */
+    private static final long serialVersionUID = 193442457510062844L;
+
+    /** Map of update counters made by this tx. Mapping: partId -> updCntr. */
+    @GridDirectMap(keyType = Integer.class, valueType = Long.class)
+    private Map<Integer, Long> updCntrs;
+
+    /** */
+    public PartitionUpdateCounters() {
+        // No-op.
+    }
+
+    /**
+     * @param updCntrs Update counters map.
+     */
+    public PartitionUpdateCounters(Map<Integer, Long> updCntrs) {
+        this.updCntrs = updCntrs;
+    }
+
+    /**
+     * @return Update counters.
+     */
+    public Map<Integer, Long> updateCounters() {
+        return updCntrs;
+    }
+
+    /**
+     * @param updCntrs Update counters.
+     */
+    public void updateCounters(Map<Integer, Long> updCntrs) {
+        this.updCntrs = updCntrs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeMap("updCntrs", updCntrs, 
MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                updCntrs = reader.readMap("updCntrs", 
MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(PartitionUpdateCounters.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 157;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 199efcb..04476ad 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1563,6 +1563,19 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
+        @Override public void updateSize(int cacheId, long delta) {
+            try {
+                CacheDataStore delegate0 = init0(false);
+
+                if (delegate0 != null)
+                    delegate0.updateSize(cacheId, delta);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
         @Override public long updateCounter() {
             try {
                 CacheDataStore delegate0 = init0(true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 4acf078..05ebe5d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -650,4 +650,10 @@ public interface IgniteInternalTx {
      * @return Mvcc snapshot.
      */
     public MvccSnapshot mvccSnapshot();
-}
\ No newline at end of file
+
+    /**
+     * @return Transaction counters.
+     * @param createIfAbsent {@code True} if non-null instance is needed.
+     */
+    @Nullable public TxCounters txCounters(boolean createIfAbsent);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index ee5a58e..c6d1991 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import javax.cache.expiry.ExpiryPolicy;
@@ -62,10 +63,12 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
-import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -136,6 +139,10 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
     private static final AtomicReferenceFieldUpdater<IgniteTxAdapter, 
FinalizationStatus> FINALIZING_UPD =
         AtomicReferenceFieldUpdater.newUpdater(IgniteTxAdapter.class, 
FinalizationStatus.class, "finalizing");
 
+    /** */
+    private static final AtomicReferenceFieldUpdater<IgniteTxAdapter, 
TxCounters> TX_COUNTERS_UPD =
+        AtomicReferenceFieldUpdater.newUpdater(IgniteTxAdapter.class, 
TxCounters.class, "txCounters");
+
     /** Logger. */
     protected static IgniteLogger log;
 
@@ -275,6 +282,9 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
     @GridToStringExclude
     private volatile IgniteInternalFuture rollbackFut;
 
+    /** */
+    private volatile TxCounters txCounters = new TxCounters();
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -2015,6 +2025,46 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
     public abstract void addActiveCache(GridCacheContext cacheCtx, boolean 
recovery) throws IgniteCheckedException;
 
     /** {@inheritDoc} */
+    @Nullable @Override public TxCounters txCounters(boolean createIfAbsent) {
+        if (createIfAbsent && txCounters == null)
+            TX_COUNTERS_UPD.compareAndSet(this, null, new TxCounters());
+
+        return txCounters;
+    }
+
+    /**
+     * Make counters accumulated during transaction visible outside of 
transaciton.
+     */
+    protected void applyTxCounters() {
+        TxCounters txCntrs = txCounters(false);
+
+        if (txCntrs == null)
+            return;
+
+        Map<Integer, ? extends Map<Integer, AtomicLong>> sizeDeltas = 
txCntrs.sizeDeltas();
+
+        for (Map.Entry<Integer, ? extends Map<Integer, AtomicLong>> entry : 
sizeDeltas.entrySet()) {
+            Integer cacheId = entry.getKey();
+            Map<Integer, AtomicLong> partDeltas = entry.getValue();
+
+            assert !F.isEmpty(partDeltas);
+
+            GridDhtPartitionTopology top = 
cctx.cacheContext(cacheId).topology();
+
+            for (Map.Entry<Integer, AtomicLong> e : partDeltas.entrySet()) {
+                Integer p = e.getKey();
+                long delta = e.getValue().get();
+
+                GridDhtLocalPartition dhtPart = top.localPartition(p);
+
+                assert dhtPart != null;
+
+                dhtPart.dataStore().updateSize(cacheId, delta);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return GridToStringBuilder.toString(IgniteTxAdapter.class, this,
             "duration", (U.currentTimeMillis() - startTime) + "ms",
@@ -2287,6 +2337,11 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
+        @Nullable @Override public TxCounters txCounters(boolean 
createIfAbsent) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
         @Override public IgniteTxState txState() {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 7541b43..32f4dd4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1391,7 +1391,7 @@ public class IgniteTxHandler {
                 tx.setPartitionUpdateCounters(
                     req.partUpdateCounters() != null ? 
req.partUpdateCounters().array() : null);
 
-                tx.updateCountersMap(req.updateCountersMap());
+                tx.txCounters(true).updateCounters(req.updateCounters());
 
                 tx.commitRemoteTx();
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index b86273f..bfe67ee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import javax.cache.expiry.Duration;
@@ -32,7 +31,6 @@ import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -59,7 +57,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsUpdateCountersMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCounters;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import 
org.apache.ignite.internal.processors.cache.persistence.StorageException;
@@ -162,9 +160,6 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
     /** */
     private GridLongList mvccWaitTxs;
 
-    /** Update counters map */
-    private Map<Integer, Map<Integer, Long>> updCntrs;
-
     /** */
     private volatile boolean qryEnlisted;
 
@@ -918,7 +913,7 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
                     }
                 }
 
-                updateLocalPartitionCounters();
+                applyTxCounters();
 
                 if (ptr != null && !cctx.tm().logTxRecords())
                     cctx.wal().flush(ptr, false);
@@ -1563,6 +1558,11 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
     }
 
     /** {@inheritDoc} */
+    @Override public void touchPartition(int cacheId, int partId) {
+        txState.touchPartition(cacheId, partId);
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return GridToStringBuilder.toString(IgniteTxLocalAdapter.class, this, 
"super", super.toString(),
             "size", allEntries().size());
@@ -1644,74 +1644,28 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
     }
 
     /**
-     * @return Partition counters map for the given backup node.
+     * Merges mvcc update counters to the partition update counters. For mvcc 
transactions we update partitions
+     * counters only on commit phase.
      */
-    public Map<Integer, GridDhtPartitionsUpdateCountersMap> 
updateCountersForNode(ClusterNode node) {
-        if (F.isEmpty(updCntrs))
+    private Map<Integer, PartitionUpdateCounters> 
applyAndCollectLocalUpdateCounters() {
+        if (F.isEmpty(txState.touchedPartitions()))
             return null;
 
-        Map<Integer, GridDhtPartitionsUpdateCountersMap> res = new HashMap<>();
+        HashMap<Integer, PartitionUpdateCounters> updCntrs = new HashMap<>();
 
-        for (Map.Entry<Integer, Map<Integer, Long>> entry : 
updCntrs.entrySet()) {
-            Map<Integer, Long> partsCntrs = entry.getValue();
+        for (Map.Entry<Integer, Set<Integer>> entry : 
txState.touchedPartitions().entrySet()) {
+            Integer cacheId = entry.getKey();
 
-            assert !F.isEmpty(partsCntrs);
+            Set<Integer> parts = entry.getValue();
 
-            GridCacheContext ctx0 = cctx.cacheContext(entry.getKey());
+            assert !F.isEmpty(parts);
 
-            GridDhtPartitionsUpdateCountersMap resBackupCntrs = new 
GridDhtPartitionsUpdateCountersMap();
+            GridCacheContext ctx0 = cctx.cacheContext(cacheId);
 
-            for (Map.Entry<Integer, Long> e : partsCntrs.entrySet()) {
-                Long cntr = partsCntrs.get(e.getKey());
+            Map<Integer, Long> partCntrs = new HashMap<>(parts.size());
 
-                if (ctx0.affinity().backupByPartition(node, e.getKey(), 
topologyVersionSnapshot())) {
-                    assert cntr != null && cntr > 0 : cntr;
-
-                    resBackupCntrs.updateCounters().put(e.getKey(), cntr);
-                }
-            }
-
-            if (!resBackupCntrs.updateCounters().isEmpty())
-                res.put(entry.getKey(), resBackupCntrs);
-        }
-
-        return res;
-    }
-
-    /**
-     * @param cacheId Cache id.
-     * @param part Partition id.
-     */
-    @SuppressWarnings("Java8MapApi")
-    public void addPartitionCountersMapping(Integer cacheId, Integer part) {
-        if (updCntrs == null)
-            updCntrs = new ConcurrentHashMap<>();
-
-        Map<Integer, Long> partUpdCntrs = updCntrs.get(cacheId);
-
-        if (partUpdCntrs == null)
-            updCntrs.put(cacheId, partUpdCntrs = new ConcurrentHashMap<>());
-
-        partUpdCntrs.put(part, 0L);
-    }
-
-    /**
-     * Merges mvcc update counters to the partition update counters. For mvcc 
transactions we update partitions
-     * counters only on commit phase.
-     */
-    private void updateLocalPartitionCounters() {
-        if (F.isEmpty(updCntrs))
-            return;
-
-        for (Map.Entry<Integer, Map<Integer, Long>> entry : 
updCntrs.entrySet()) {
-            Map<Integer, Long> partsCntrs = entry.getValue();
-
-            assert !F.isEmpty(partsCntrs);
-
-            GridCacheContext ctx0 = cctx.cacheContext(entry.getKey());
-
-            for (Map.Entry<Integer, Long> e : partsCntrs.entrySet()) {
-                GridDhtLocalPartition dhtPart = 
ctx0.topology().localPartition(e.getKey());
+            for (Integer p : parts) {
+                GridDhtLocalPartition dhtPart = 
ctx0.topology().localPartition(p);
 
                 assert dhtPart != null;
 
@@ -1719,11 +1673,13 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
 
                 dhtPart.updateCounter(cntr);
 
-                Long prev = partsCntrs.put(e.getKey(), cntr);
-
-                assert prev == 0L : prev;
+                partCntrs.put(p, cntr);
             }
+
+            updCntrs.put(cacheId, new PartitionUpdateCounters(partCntrs));
         }
+
+        return updCntrs;
     }
 
     /**
@@ -1747,6 +1703,16 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter implements Ig
         }
     }
 
+    /** {@inheritDoc} */
+    @Override protected void applyTxCounters() {
+        super.applyTxCounters();
+
+        Map<Integer, PartitionUpdateCounters> updCntrs = 
applyAndCollectLocalUpdateCounters();
+
+        // remember counters for subsequent sending to backups
+        txCounters(true).updateCounters(updCntrs);
+    }
+
     /**
      * Post-lock closure alias.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index b61b1a9..651be60 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -56,4 +56,12 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      * @throws IgniteCheckedException If finish failed.
      */
     public boolean localFinish(boolean commit, boolean clearThreadMap) throws 
IgniteCheckedException;
+
+    /**
+     * Remembers that particular cache partition was touched by current tx.
+     *
+     * @param cacheId Cache id.
+     * @param partId Partition id.
+     */
+    public void touchPartition(int cacheId, int partId);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
index 123d396..01eb4f4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
+import java.util.Map;
+import java.util.Set;
+
 /**
  *
  */
@@ -41,4 +44,17 @@ public interface IgniteTxLocalState extends IgniteTxState {
      *
      */
     public void seal();
+
+    /**
+     * @return Cache partitions touched by current tx.
+     */
+    public Map<Integer, Set<Integer>> touchedPartitions();
+
+    /**
+     * Remembers that particular cache partition was touched by current tx.
+     *
+     * @param cacheId Cache id.
+     * @param partId Partition id.
+     */
+    public void touchPartition(int cacheId, int partId);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java
index 4943aac..9c6ef8f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java
@@ -17,7 +17,13 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
@@ -25,6 +31,11 @@ import org.apache.ignite.internal.util.typedef.internal.U;
  *
  */
 public abstract class IgniteTxLocalStateAdapter implements IgniteTxLocalState {
+    /** */
+    private static final Function<Integer, Set<Integer>> CREATE_INT_SET = k -> 
new HashSet<>();
+    /** */
+    private Map<Integer, Set<Integer>> touchedParts;
+
     /**
      * @param cacheCtx Cache context.
      * @param tx Transaction.
@@ -40,4 +51,19 @@ public abstract class IgniteTxLocalStateAdapter implements 
IgniteTxLocalState {
                 cacheCtx.cache().metrics0().onTxRollback(durationNanos);
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public Map<Integer, Set<Integer>> touchedPartitions() {
+        Map<Integer, Set<Integer>> parts = touchedParts;
+
+        return parts != null ? Collections.unmodifiableMap(parts) : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void touchPartition(int cacheId, int partId) {
+        if (touchedParts == null)
+            touchedParts = new HashMap<>();
+
+        touchedParts.computeIfAbsent(cacheId, CREATE_INT_SET).add(partId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
index 1e0645f..87cc7cc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
@@ -18,9 +18,7 @@
 package org.apache.ignite.internal.processors.cache.transactions;
 
 import java.util.Collection;
-import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsUpdateCountersMap;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 
 /**
@@ -52,14 +50,4 @@ public interface IgniteTxRemoteEx extends IgniteInternalTx {
      * @param cntrs Partition update indexes.
      */
     public void setPartitionUpdateCounters(long[] cntrs);
-
-    /**
-     * @param updCntrsMap Partition update counters map: cacheId -> partId -> 
updateCntr.
-     */
-    public void updateCountersMap(Map<Integer, 
GridDhtPartitionsUpdateCountersMap> updCntrsMap);
-
-    /**
-     * @return Partition update counters map: cacheId -> partId -> updateCntr.
-     */
-    public Map<Integer, GridDhtPartitionsUpdateCountersMap> 
updateCountersMap();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
new file mode 100644
index 0000000..2ad4f94
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCounters;
+
+/**
+ * Values which should be tracked during transaction execution and applied on 
commit.
+ */
+public class TxCounters {
+    /** Size changes for cache partitions made by transaction */
+    private final ConcurrentMap<Integer, ConcurrentMap<Integer, AtomicLong>> 
sizeDeltas = new ConcurrentHashMap<>();
+    /** Update counters for cache partitions in the end of transaction */
+    private Map<Integer, PartitionUpdateCounters> updCntrs;
+
+    /**
+     * Accumulates size change for cache partition.
+     *
+     * @param cacheId Cache id.
+     * @param part Partition id.
+     * @param delta Size delta.
+     */
+    public void accumulateSizeDelta(int cacheId, int part, long delta) {
+        ConcurrentMap<Integer, AtomicLong> partDeltas = 
sizeDeltas.get(cacheId);
+
+        if (partDeltas == null) {
+            ConcurrentMap<Integer, AtomicLong> partDeltas0 =
+                sizeDeltas.putIfAbsent(cacheId, partDeltas = new 
ConcurrentHashMap<>());
+
+            if (partDeltas0 != null)
+                partDeltas = partDeltas0;
+        }
+
+        AtomicLong accDelta = partDeltas.get(part);
+
+        if (accDelta == null) {
+            AtomicLong accDelta0 = partDeltas.putIfAbsent(part, accDelta = new 
AtomicLong());
+
+            if (accDelta0 != null)
+                accDelta = accDelta0;
+        }
+
+        // here AtomicLong is used more as a container,
+        // every instance is assumed to be accessed in thread-confined manner
+        accDelta.set(accDelta.get() + delta);
+    }
+
+    /** */
+    public void updateCounters(Map<Integer, PartitionUpdateCounters> updCntrs) 
{
+        this.updCntrs = updCntrs;
+    }
+
+    /** */
+    public Map<Integer, PartitionUpdateCounters> updateCounters() {
+        return updCntrs != null ? Collections.unmodifiableMap(updCntrs) : 
Collections.emptyMap();
+    }
+
+    /** */
+    public Map<Integer, ? extends Map<Integer, AtomicLong>> sizeDeltas() {
+        return Collections.unmodifiableMap(sizeDeltas);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java
new file mode 100644
index 0000000..32709ff
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeTest.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+
+import static org.apache.ignite.cache.CachePeekMode.BACKUP;
+
+/**
+ *
+ */
+public class CacheMvccSizeTest extends CacheMvccAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** */
+    private void checkSizeModificationByOperation(String sql, boolean commit, 
int expSizeDelta) throws Exception {
+        checkSizeModificationByOperation(c -> {}, cache -> 
cache.query(q(sql)).getAll(), commit, expSizeDelta);
+    }
+
+    /** */
+    private void checkSizeModificationByOperation(String initSql, String sql, 
boolean commit,
+        int expSizeDelta) throws Exception {
+        checkSizeModificationByOperation(
+            cache -> cache.query(q(initSql)).getAll(),
+            cache -> cache.query(q(sql)).getAll(),
+            commit,
+            expSizeDelta);
+    }
+
+    /** */
+    private void checkSizeModificationByOperation(Consumer<IgniteCache<?, ?>> 
inTx, boolean commit,
+        int expSizeDelta) throws Exception {
+        checkSizeModificationByOperation(c -> {}, inTx, commit, expSizeDelta);
+    }
+
+    /** */
+    private void checkSizeModificationByOperation(Consumer<IgniteCache<?, ?>> 
beforeTx,
+        Consumer<IgniteCache<?, ?>> inTx, boolean commit, int expSizeDelta) 
throws Exception {
+        IgniteCache<Object, Object> tbl0 = grid(0).cache("person");
+
+        tbl0.query(q("delete from person"));
+
+        beforeTx.accept(tbl0);
+
+        int initSize = tbl0.size();
+
+        tbl0.query(q("begin"));
+
+        inTx.accept(tbl0);
+
+        // size is not changed before commit
+        assertEquals(0, tbl0.size() - initSize);
+
+        if (commit)
+            tbl0.query(q("commit"));
+        else
+            tbl0.query(q("rollback"));
+
+        assertEquals(expSizeDelta, tbl0.size() - initSize);
+        assertEquals(tbl0.size(), table(grid(1)).size());
+
+        assertEquals(tbl0.size(), tbl0.size(BACKUP));
+        assertEquals(tbl0.size(), table(grid(1)).size(BACKUP));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testSql() throws Exception {
+        startGridsMultiThreaded(2);
+
+        createTable(grid(0));
+
+        checkSizeModificationByOperation("insert into person values(1, 'a')", 
true, 1);
+
+        checkSizeModificationByOperation("insert into person values(1, 'a')", 
false, 0);
+
+        checkSizeModificationByOperation(
+            personTbl -> personTbl.query(q("insert into person values(1, 
'a')")),
+            personTbl -> {
+                try {
+                    personTbl.query(q("insert into person values(1, 'a')"));
+                }
+                catch (Exception e) {
+                    if (e.getCause() instanceof IgniteSQLException) {
+                        assertEquals(IgniteQueryErrorCode.DUPLICATE_KEY,
+                            ((IgniteSQLException)e.getCause()).statusCode());
+                    }
+                    else {
+                        e.printStackTrace();
+
+                        fail("Unexpected exceptions");
+                    }
+                }
+            },
+            true, 0);
+
+        checkSizeModificationByOperation("merge into person(id, name) 
values(1, 'a')", true, 1);
+
+        checkSizeModificationByOperation("merge into person(id, name) 
values(1, 'a')", false, 0);
+
+        checkSizeModificationByOperation(
+            "insert into person values(1, 'a')", "merge into person(id, name) 
values(1, 'b')", true, 0);
+
+        checkSizeModificationByOperation("update person set name = 'b' where 
id = 1", true, 0);
+
+        checkSizeModificationByOperation(
+            "insert into person values(1, 'a')", "update person set name = 'b' 
where id = 1", true, 0);
+
+        checkSizeModificationByOperation(
+            "insert into person values(1, 'a')", "delete from person where id 
= 1", true, -1);
+
+        checkSizeModificationByOperation(
+            "insert into person values(1, 'a')", "delete from person where id 
= 1", false, 0);
+
+        checkSizeModificationByOperation("delete from person where id = 1", 
true, 0);
+
+        checkSizeModificationByOperation(
+            "insert into person values(1, 'a')", "select * from person", true, 
0);
+
+        checkSizeModificationByOperation("select * from person", true, 0);
+
+        checkSizeModificationByOperation(
+            "insert into person values(1, 'a')", "select * from person where 
id = 1 for update", true, 0);
+
+        checkSizeModificationByOperation("select * from person where id = 1 
for update", true, 0);
+
+        checkSizeModificationByOperation(personTbl -> {
+            personTbl.query(q("insert into person values(1, 'a')"));
+
+            personTbl.query(q("insert into person values(%d, 'b')", 
keyInSamePartition(grid(0), "person", 1)));
+
+            personTbl.query(q("insert into person values(%d, 'c')", 
keyInDifferentPartition(grid(0), "person", 1)));
+        }, true, 3);
+
+        checkSizeModificationByOperation(personTbl -> {
+            personTbl.query(q("insert into person values(1, 'a')"));
+
+            personTbl.query(q("delete from person where id = 1"));
+        }, true, 0);
+
+        checkSizeModificationByOperation(personTbl -> {
+            personTbl.query(q("insert into person values(1, 'a')"));
+
+            personTbl.query(q("delete from person where id = 1"));
+
+            personTbl.query(q("insert into person values(1, 'a')"));
+        }, true, 1);
+
+        checkSizeModificationByOperation(
+            personTbl -> personTbl.query(q("insert into person values(1, 
'a')")),
+            personTbl -> {
+                personTbl.query(q("delete from person where id = 1"));
+
+                personTbl.query(q("insert into person values(1, 'a')"));
+            }, true, 0);
+
+        checkSizeModificationByOperation(personTbl -> {
+            personTbl.query(q("merge into person(id, name) values(1, 'a')"));
+
+            personTbl.query(q("delete from person where id = 1"));
+        }, true, 0);
+
+        checkSizeModificationByOperation(personTbl -> {
+            personTbl.query(q("merge into person(id, name) values(1, 'a')"));
+
+            personTbl.query(q("delete from person where id = 1"));
+
+            personTbl.query(q("merge into person(id, name) values(1, 'a')"));
+        }, true, 1);
+
+        checkSizeModificationByOperation(
+            personTbl -> personTbl.query(q("merge into person(id, name) 
values(1, 'a')")),
+            personTbl -> {
+                personTbl.query(q("delete from person where id = 1"));
+
+                personTbl.query(q("merge into person(id, name) values(1, 
'a')"));
+            }, true, 0);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testInsertDeleteConcurrent() throws Exception {
+        startGridsMultiThreaded(2);
+
+        IgniteCache<?, ?> tbl0 = createTable(grid(0));
+
+        SqlFieldsQuery insert = new SqlFieldsQuery("insert into person(id, 
name) values(?, 'a')");
+
+        SqlFieldsQuery delete = new SqlFieldsQuery("delete from person where 
id = ?");
+
+        CompletableFuture<Integer> insertFut = 
CompletableFuture.supplyAsync(() -> {
+            int cnt = 0;
+
+            for (int i = 0; i < 1000; i++)
+                cnt += 
update(insert.setArgs(ThreadLocalRandom.current().nextInt(10)), tbl0);
+
+            return cnt;
+        });
+
+        CompletableFuture<Integer> deleteFut = 
CompletableFuture.supplyAsync(() -> {
+            int cnt = 0;
+
+            for (int i = 0; i < 1000; i++)
+                cnt += 
update(delete.setArgs(ThreadLocalRandom.current().nextInt(10)), tbl0);
+
+            return cnt;
+        });
+
+        int expSize = insertFut.join() - deleteFut.join();
+
+        assertEquals(expSize, tbl0.size());
+        assertEquals(expSize, table(grid(1)).size());
+
+        assertEquals(expSize, tbl0.size(BACKUP));
+        assertEquals(expSize, table(grid(1)).size(BACKUP));
+    }
+
+    /** */
+    private int update(SqlFieldsQuery qry, IgniteCache<?, ?> cache) {
+        try {
+            return 
Integer.parseInt(cache.query(qry).getAll().get(0).get(0).toString());
+        } catch (Exception e) {
+            return 0;
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testWriteConflictDoesNotChangeSize() throws Exception {
+        startGridsMultiThreaded(2);
+
+        IgniteCache<?, ?> tbl0 = createTable(grid(0));
+
+        tbl0.query(q("insert into person values(1, 'a')"));
+
+        tbl0.query(q("begin"));
+
+        tbl0.query(q("delete from person where id = 1"));
+
+        CompletableFuture<Void> conflictingStarted = new CompletableFuture<>();
+
+        CompletableFuture<Void> fut = CompletableFuture.runAsync(() -> {
+            tbl0.query(q("begin"));
+
+            try {
+                tbl0.query(q("select * from person")).getAll();
+                conflictingStarted.complete(null);
+
+                tbl0.query(q("merge into person(id, name) values(1, 'b')"));
+            }
+            finally {
+                tbl0.query(q("commit"));
+            }
+        });
+
+        conflictingStarted.join();
+        tbl0.query(q("commit"));
+
+        try {
+            fut.join();
+        }
+        catch (Exception e) {
+            if (e.getCause().getCause() instanceof IgniteSQLException)
+                assertTrue(e.getMessage().toLowerCase().contains("version 
mismatch"));
+            else {
+                e.printStackTrace();
+
+                fail("Unexpected exception");
+            }
+        }
+
+        assertEquals(0, tbl0.size());
+        assertEquals(0, table(grid(1)).size());
+
+        assertEquals(0, tbl0.size(BACKUP));
+        assertEquals(0, table(grid(1)).size(BACKUP));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testDeleteChangesSizeAfterUnlock() throws Exception {
+        startGridsMultiThreaded(2);
+
+        IgniteCache<?, ?> tbl0 = createTable(grid(0));
+
+        tbl0.query(q("insert into person values(1, 'a')"));
+
+        tbl0.query(q("begin"));
+
+        tbl0.query(q("select * from person where id = 1 for update")).getAll();
+
+        CompletableFuture<Thread> asyncThread = new CompletableFuture<>();
+
+        CompletableFuture<Void> fut = CompletableFuture.runAsync(() -> {
+            tbl0.query(q("begin"));
+
+            try {
+                tbl0.query(q("select * from person")).getAll();
+
+                asyncThread.complete(Thread.currentThread());
+                tbl0.query(q("delete from person where id = 1"));
+            }
+            finally {
+                tbl0.query(q("commit"));
+            }
+        });
+
+        Thread concThread = asyncThread.join();
+
+        // wait until concurrent thread blocks awaiting entry mvcc lock release
+        while (concThread.getState() == Thread.State.RUNNABLE && 
!Thread.currentThread().isInterrupted());
+
+        tbl0.query(q("commit"));
+
+        fut.join();
+
+        assertEquals(0, tbl0.size());
+        assertEquals(0, table(grid(1)).size());
+
+        assertEquals(0, tbl0.size(BACKUP));
+        assertEquals(0, table(grid(1)).size(BACKUP));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testDataStreamerModifiesReplicatedCacheSize() throws Exception 
{
+        startGridsMultiThreaded(2);
+
+        IgniteEx ignite = grid(0);
+
+        ignite.createCache(
+            new CacheConfiguration<>("test")
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                .setCacheMode(CacheMode.REPLICATED)
+        );
+
+        try (IgniteDataStreamer<Object, Object> streamer = 
ignite.dataStreamer("test")) {
+            streamer.addData(1, "a");
+
+            streamer.addData(keyInDifferentPartition(ignite, "test", 1), "b");
+        }
+
+        assertEquals(2, ignite.cache("test").size());
+
+        assertEquals(1, grid(0).cache("test").localSize());
+        assertEquals(1, grid(0).cache("test").localSize(BACKUP));
+
+        assertEquals(1, grid(1).cache("test").localSize());
+        assertEquals(1, grid(1).cache("test").localSize(BACKUP));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testSizeIsConsistentAfterRebalance() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        IgniteCache<?, ?> tbl = createTable(ignite);
+
+        for (int i = 0; i < 100; i++)
+            tbl.query(q("insert into person values(?, ?)").setArgs(i, i));
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        IgniteCache<?, ?> tbl0 = grid(0).cache("person");
+        IgniteCache<?, ?> tbl1 = grid(1).cache("person");
+
+        assert tbl0.localSize() != 0 && tbl1.localSize() != 0;
+
+        assertEquals(100, tbl1.size());
+        assertEquals(100, tbl0.localSize() + tbl1.localSize());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSizeIsConsistentAfterRebalanceDuringInsert() throws 
Exception {
+        IgniteEx ignite = startGrid(0);
+
+        IgniteCache<?, ?> tbl = createTable(ignite);
+
+        Future<?> f = null;
+
+        for (int i = 0; i < 100; i++) {
+            if (i == 50)
+                f = ForkJoinPool.commonPool().submit(() -> startGrid(1));
+
+            tbl.query(q("insert into person values(?, ?)").setArgs(i, i));
+        }
+
+        f.get();
+
+        awaitPartitionMapExchange();
+
+        IgniteCache<?, ?> tbl0 = grid(0).cache("person");
+        IgniteCache<?, ?> tbl1 = grid(1).cache("person");
+
+        assert tbl0.localSize() != 0 && tbl1.localSize() != 0;
+
+        assertEquals(100, tbl1.size());
+        assertEquals(100, tbl0.localSize() + tbl1.localSize());
+    }
+
+    /** */
+    private static IgniteCache<?, ?> table(IgniteEx ignite) {
+        assert ignite.cachex("person").configuration().getAtomicityMode() == 
CacheAtomicityMode.TRANSACTIONAL;
+        assert ignite.cachex("person").configuration().getCacheMode() == 
CacheMode.REPLICATED;
+
+        return ignite.cache("person");
+    }
+
+    /** */
+    private static IgniteCache<?, ?> createTable(IgniteEx ignite) {
+        IgniteCache<?, ?> sqlNexus = ignite.getOrCreateCache(new 
CacheConfiguration<>("sqlNexus").setSqlSchema("PUBLIC"));
+
+        sqlNexus.query(q("" +
+            "create table person(" +
+            "  id int primary key," +
+            "  name varchar" +
+            ") with 
\"atomicity=transactional,template=replicated,cache_name=person\""));
+
+        return table(ignite);
+    }
+
+    /** */
+    private static SqlFieldsQuery q(String fSql, Object... args) {
+        return new SqlFieldsQuery(String.format(fSql, args));
+    }
+
+    /** */
+    private static int keyInSamePartition(Ignite ignite, String cacheName, int 
key) {
+        Affinity<Object> affinity = ignite.affinity(cacheName);
+
+        return IntStream.iterate(key + 1, i -> i + 1)
+            .filter(i -> affinity.partition(i) == affinity.partition(key))
+            .findFirst().getAsInt();
+    }
+
+    /** */
+    private static int keyInDifferentPartition(Ignite ignite, String 
cacheName, int key) {
+        Affinity<Object> affinity = ignite.affinity(cacheName);
+
+        return IntStream.iterate(key + 1, i -> i + 1)
+            .filter(i -> affinity.partition(i) != affinity.partition(key))
+            .findFirst().getAsInt();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00e935d3/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
index ab9d2e6..c8f7643 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
@@ -36,6 +36,7 @@ import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccReplicatedSqlQu
 import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccReplicatedSqlTxQueriesTest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccReplicatedSqlTxQueriesWithReducerTest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccScanQueryWithConcurrentJdbcTransactionTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeTest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeWithConcurrentJdbcTransactionTest;
 import 
org.apache.ignite.internal.processors.cache.mvcc.CacheMvccStreamingInsertTest;
 import 
org.apache.ignite.internal.processors.query.h2.GridIndexRebuildWithMvccEnabledSelfTest;
@@ -50,9 +51,10 @@ public class IgniteCacheMvccSqlTestSuite extends TestSuite {
     public static TestSuite suite() {
         TestSuite suite = new TestSuite("IgniteCache SQL MVCC Test Suite");
 
-        // Simle tests.
+        // Simple tests.
         suite.addTestSuite(CacheMvccDmlSimpleTest.class);
         
suite.addTestSuite(SqlTransactionsCommandsWithMvccEnabledSelfTest.class);
+        suite.addTestSuite(CacheMvccSizeTest.class);
 
         suite.addTestSuite(GridIndexRebuildWithMvccEnabledSelfTest.class);
 
@@ -83,7 +85,6 @@ public class IgniteCacheMvccSqlTestSuite extends TestSuite {
         
suite.addTestSuite(CacheMvccPartitionedSqlCoordinatorFailoverTest.class);
         
suite.addTestSuite(CacheMvccReplicatedSqlCoordinatorFailoverTest.class);
 
-
         return suite;
     }
 }

Reply via email to