http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index b9ab76a..f160549 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -49,6 +49,7 @@ import 
org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import 
org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
@@ -467,6 +468,11 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
     }
 
     /** {@inheritDoc} */
+    @Override public MvccProcessor coordinators() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean invalid() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java
index 1807d1d..65f0aae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java
@@ -35,6 +35,9 @@ public final class RecordTypes {
         
DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_INSERT_FRAGMENT_RECORD);
         DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_REMOVE_RECORD);
         DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_SET_FREE_LIST_PAGE);
+        
DELTA_TYPE_SET.add(WALRecord.RecordType.MVCC_DATA_PAGE_MARK_UPDATED_RECORD);
+        
DELTA_TYPE_SET.add(WALRecord.RecordType.MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD);
+        
DELTA_TYPE_SET.add(WALRecord.RecordType.MVCC_DATA_PAGE_NEW_TX_STATE_HINT_UPDATED_RECORD);
         DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_INIT_ROOT);
         DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_ADD_ROOT);
         DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_CUT_ROOT);

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index ad06090..aa89c5a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -41,6 +41,8 @@ import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import 
org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
 import 
org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord;
+import 
org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccUpdateNewTxStateHintRecord;
+import 
org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccUpdateTxStateHintRecord;
 import 
org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord;
 import 
org.apache.ignite.internal.pagemem.wal.record.delta.DataPageSetFreeListPageRecord;
 import 
org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord;
@@ -61,6 +63,7 @@ import 
org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuc
 import 
org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuccessfulSnapshotId;
 import 
org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId;
 import 
org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord;
+import 
org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccMarkUpdatedRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.NewRootInitRecord;
 import 
org.apache.ignite.internal.pagemem.wal.record.delta.PageListMetaResetCountRecord;
 import 
org.apache.ignite.internal.pagemem.wal.record.delta.PagesListAddPageRecord;
@@ -194,6 +197,15 @@ public class RecordDataV1Serializer implements 
RecordDataSerializer {
             case DATA_PAGE_SET_FREE_LIST_PAGE:
                 return 4 + 8 + 8;
 
+            case MVCC_DATA_PAGE_MARK_UPDATED_RECORD:
+                return 4 + 8 + 4 + 8 + 8 + 4;
+
+            case MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD:
+                return 4 + 8 + 4 + 1;
+
+            case MVCC_DATA_PAGE_NEW_TX_STATE_HINT_UPDATED_RECORD:
+                return 4 + 8 + 4 + 1;
+
             case INIT_NEW_PAGE_RECORD:
                 return 4 + 8 + 2 + 2 + 8;
 
@@ -504,6 +516,41 @@ public class RecordDataV1Serializer implements 
RecordDataSerializer {
 
                 break;
 
+            case MVCC_DATA_PAGE_MARK_UPDATED_RECORD:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                itemId = in.readInt();
+                long newMvccCrd = in.readLong();
+                long newMvccCntr = in.readLong();
+                int newMvccOpCntr = in.readInt();
+
+                res = new DataPageMvccMarkUpdatedRecord(cacheId, pageId, 
itemId, newMvccCrd, newMvccCntr, newMvccOpCntr);
+
+                break;
+
+            case MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                itemId = in.readInt();
+                byte txState = in.readByte();
+
+                res = new DataPageMvccUpdateTxStateHintRecord(cacheId, pageId, 
itemId, txState);
+
+                break;
+
+            case MVCC_DATA_PAGE_NEW_TX_STATE_HINT_UPDATED_RECORD:
+                cacheId = in.readInt();
+                pageId = in.readLong();
+
+                itemId = in.readInt();
+                byte newTxState = in.readByte();
+
+                res = new DataPageMvccUpdateNewTxStateHintRecord(cacheId, 
pageId, itemId, newTxState);
+
+                break;
+
             case INIT_NEW_PAGE_RECORD:
                 cacheId = in.readInt();
                 pageId = in.readLong();
@@ -1035,6 +1082,41 @@ public class RecordDataV1Serializer implements 
RecordDataSerializer {
 
                 break;
 
+            case MVCC_DATA_PAGE_MARK_UPDATED_RECORD:
+                DataPageMvccMarkUpdatedRecord rmvRec = 
(DataPageMvccMarkUpdatedRecord)rec;
+
+                buf.putInt(rmvRec.groupId());
+                buf.putLong(rmvRec.pageId());
+
+                buf.putInt(rmvRec.itemId());
+                buf.putLong(rmvRec.newMvccCrd());
+                buf.putLong(rmvRec.newMvccCntr());
+                buf.putInt(rmvRec.newMvccOpCntr());
+
+                break;
+
+            case MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD:
+                DataPageMvccUpdateTxStateHintRecord txStRec = 
(DataPageMvccUpdateTxStateHintRecord)rec;
+
+                buf.putInt(txStRec.groupId());
+                buf.putLong(txStRec.pageId());
+
+                buf.putInt(txStRec.itemId());
+                buf.put(txStRec.txState());
+
+                break;
+
+            case MVCC_DATA_PAGE_NEW_TX_STATE_HINT_UPDATED_RECORD:
+                DataPageMvccUpdateNewTxStateHintRecord newTxStRec = 
(DataPageMvccUpdateNewTxStateHintRecord)rec;
+
+                buf.putInt(newTxStRec.groupId());
+                buf.putLong(newTxStRec.pageId());
+
+                buf.putInt(newTxStRec.itemId());
+                buf.put(newTxStRec.txState());
+
+                break;
+
             case INIT_NEW_PAGE_RECORD:
                 InitNewPageRecord inpRec = (InitNewPageRecord)rec;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index aac1659..c5f64c9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -35,6 +35,7 @@ import 
org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
@@ -195,6 +196,9 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
      */
     @SuppressWarnings("unchecked")
     @Override void processQueryRequest(UUID sndId, GridCacheQueryRequest req) {
+        assert req.mvccSnapshot() != null || !cctx.mvccEnabled() || 
req.cancel() ||
+            (req.type() == null && !req.fields()) : req; // Last assertion 
means next page request.
+
         if (req.cancel()) {
             cancelIds.add(new CancelMessageId(req.id(), sndId));
 
@@ -277,7 +281,8 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
                 req.includeMetaData(),
                 req.keepBinary(),
                 req.subjectId(),
-                req.taskHash()
+                req.taskHash(),
+                req.mvccSnapshot()
             );
 
         return new GridCacheQueryInfo(
@@ -531,6 +536,8 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
 
             String clsName = qry.query().queryClassName();
 
+            MvccSnapshot mvccSnapshot = qry.query().mvccSnapshot();
+
             final GridCacheQueryRequest req = new GridCacheQueryRequest(
                 cctx.cacheId(),
                 reqId,
@@ -551,6 +558,7 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
                 qry.query().subjectId(),
                 qry.query().taskHash(),
                 queryTopologyVersion(),
+                mvccSnapshot,
                 // Force deployment anyway if scan query is used.
                 cctx.deploymentEnabled() || (qry.query().scanFilter() != null 
&& cctx.gridDeploy().enabled()));
 
@@ -581,6 +589,7 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
         Collection<ClusterNode> nodes) throws IgniteCheckedException {
         assert !cctx.isLocal() : cctx.name();
         assert qry.type() == GridCacheQueryType.SCAN: qry;
+        assert qry.mvccSnapshot() != null || !cctx.mvccEnabled();
 
         GridCloseableIterator locIter0 = null;
 
@@ -606,7 +615,7 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
 
         final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, 
qry.<K, V>transform(), null);
 
-        final CacheQueryFuture fut = (CacheQueryFuture)queryDistributed(bean, 
nodes);
+        final CacheQueryFuture fut = queryDistributed(bean, nodes);
 
         return new GridCloseableIteratorAdapter() {
             /** */
@@ -749,6 +758,7 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
                 qry.query().subjectId(),
                 qry.query().taskHash(),
                 queryTopologyVersion(),
+                null,
                 cctx.deploymentEnabled());
 
             addQueryFuture(req.id(), fut);

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 51fdd58..f21a22f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -29,6 +30,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.query.Query;
@@ -43,6 +45,9 @@ import 
org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
@@ -60,6 +65,7 @@ import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.plugin.security.SecurityPermission;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheMode.LOCAL;
@@ -127,6 +133,9 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
     /** */
     private int taskHash;
 
+    /** */
+    private MvccSnapshot mvccSnapshot;
+
     /**
      * @param cctx Context.
      * @param type Query type.
@@ -213,6 +222,7 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
      * @param keepBinary Keep binary flag.
      * @param subjId Security subject ID.
      * @param taskHash Task hash.
+     * @param mvccSnapshot Mvcc version.
      */
     public GridCacheQueryAdapter(GridCacheContext<?, ?> cctx,
         GridCacheQueryType type,
@@ -229,7 +239,8 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
         boolean incMeta,
         boolean keepBinary,
         UUID subjId,
-        int taskHash) {
+        int taskHash,
+        MvccSnapshot mvccSnapshot) {
         this.cctx = cctx;
         this.type = type;
         this.log = log;
@@ -246,6 +257,14 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
         this.keepBinary = keepBinary;
         this.subjId = subjId;
         this.taskHash = taskHash;
+        this.mvccSnapshot = mvccSnapshot;
+    }
+
+    /**
+     * @return MVCC snapshot.
+     */
+    @Nullable MvccSnapshot mvccSnapshot() {
+        return mvccSnapshot;
     }
 
     /**
@@ -400,7 +419,7 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
      */
     @SuppressWarnings("unchecked")
     @Nullable public <K, V> IgniteClosure<Map.Entry<K, V>, Object> transform() 
{
-        return (IgniteClosure<Map.Entry<K, V>, Object>) transform;
+        return (IgniteClosure<Map.Entry<K, V>, Object>)transform;
     }
 
     /**
@@ -519,15 +538,23 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
 
         final GridCacheQueryManager qryMgr = cctx.queries();
 
+        MvccQueryTracker mvccTracker = null;
+
+        if (cctx.mvccEnabled() && mvccSnapshot == null)
+            mvccSnapshot = (mvccTracker = MvccUtils.mvccTracker(cctx, 
false)).snapshot();
+
         boolean loc = nodes.size() == 1 && 
F.first(nodes).id().equals(cctx.localNodeId());
 
-        if (loc)
-            return qryMgr.scanQueryLocal(this, true);
+        GridCloseableIterator it;
 
-        if (part != null)
-            return new ScanQueryFallbackClosableIterator(part, this, qryMgr, 
cctx);
+        if (loc)
+            it = qryMgr.scanQueryLocal(this, true);
+        else if (part != null)
+            it = new ScanQueryFallbackClosableIterator(part, this, qryMgr, 
cctx);
         else
-            return qryMgr.scanQueryDistributed(this, nodes);
+            it = qryMgr.scanQueryDistributed(this, nodes);
+
+        return mvccTracker != null ? new MvccTrackingIterator(it, mvccTracker) 
: it;
     }
 
     /**
@@ -781,7 +808,7 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
          * @return Cache entry
          */
         private Object convert(Object obj) {
-            if(qry.transform() != null)
+            if (qry.transform() != null)
                 return obj;
 
             Map.Entry e = (Map.Entry)obj;
@@ -853,4 +880,93 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
                 t.get2().cancel();
         }
     }
+
+    /**
+     * Wrapper for an MVCC-related iterators.
+     */
+    private static class MvccTrackingIterator implements GridCloseableIterator 
{
+        /** Serial version uid. */
+        private static final long serialVersionUID = -1905248502802333832L;
+        /** Underlying iterator. */
+        private final GridCloseableIterator it;
+
+        /** Query MVCC tracker. */
+        private final MvccQueryTracker mvccTracker;
+
+        /**
+         * Constructor.
+         *
+         * @param it Underlying iterator.
+         * @param mvccTracker Query MVCC tracker.
+         */
+        MvccTrackingIterator(GridCloseableIterator it, MvccQueryTracker 
mvccTracker) {
+            assert it != null && mvccTracker != null;
+
+            this.it = it;
+            this.mvccTracker = mvccTracker;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IgniteCheckedException {
+            if (isClosed())
+                return;
+
+            try {
+                it.close();
+            }
+            finally {
+                mvccTracker.onDone();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isClosed() {
+            return it.isClosed();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            boolean hasNext = it.hasNext();
+
+            if (!hasNext)
+                try {
+                    close();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+
+            return hasNext;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNextX() throws IgniteCheckedException {
+            boolean hasNext = it.hasNext();
+
+            if (!hasNext)
+                close();
+
+            return hasNext;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object nextX() throws IgniteCheckedException {
+            return it.nextX();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeX() throws IgniteCheckedException {
+            it.removeX();
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public Iterator iterator() {
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object next() {
+            return it.next();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 37bdb80..c209602 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -380,8 +380,8 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
      * @param prevRowAvailable Whether previous row is available.
      * @throws IgniteCheckedException In case of error.
      */
-    public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow, 
boolean prevRowAvailable)
-        throws IgniteCheckedException {
+    public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow,
+        boolean prevRowAvailable) throws IgniteCheckedException {
         assert enabled();
         assert newRow != null && newRow.value() != null && newRow.link() != 0 
: newRow;
 
@@ -414,7 +414,8 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
      * @param prevRow Previous row.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow) 
throws IgniteCheckedException {
+    public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow)
+        throws IgniteCheckedException {
         if (!QueryUtils.isEnabled(cctx.config()))
             return; // No-op.
 
@@ -803,6 +804,8 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
     private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?> 
qry, IgniteClosure transformer,
         boolean locNode)
         throws IgniteCheckedException {
+        assert !cctx.mvccEnabled() || qry.mvccSnapshot() != null;
+
         final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
         final InternalScanFilter<K,V> intFilter = keyValFilter != null ? new 
InternalScanFilter<>(keyValFilter) : null;
 
@@ -843,13 +846,12 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
                 locPart = locPart0;
 
-                it = cctx.offheap().cachePartitionIterator(cctx.cacheId(), 
part);
+                it = cctx.offheap().cachePartitionIterator(cctx.cacheId(), 
part, qry.mvccSnapshot());
             }
             else {
                 locPart = null;
 
-                // TODO shouldn't we reserve all involved partitions?
-                it = cctx.offheap().cacheIterator(cctx.cacheId(), true, 
backups, topVer);
+                it = cctx.offheap().cacheIterator(cctx.cacheId(), true, 
backups, topVer, qry.mvccSnapshot());
             }
 
             return new ScanQueryIterator(it, qry, topVer, locPart, 
keyValFilter, transformer, locNode, cctx, log);
@@ -2923,11 +2925,13 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
             boolean locNode,
             GridCacheContext cctx,
             IgniteLogger log) {
+
             this.it = it;
             this.topVer = topVer;
             this.locPart = locPart;
             this.intScanFilter = scanFilter != null ? new 
InternalScanFilter<>(scanFilter) : null;
             this.cctx = cctx;
+
             this.log = log;
             this.locNode = locNode;
 
@@ -2938,7 +2942,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
             readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ) 
&&
                 cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ);
 
-            if(readEvt){
+            if (readEvt){
                 taskName = 
cctx.kernalContext().task().resolveTaskName(qry.taskHash());
                 subjId = qry.subjectId();
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 9dc7817..ebbca35 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -26,8 +26,8 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -129,6 +129,9 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
     /** */
     private AffinityTopologyVersion topVer;
 
+    /** */
+    private MvccSnapshot mvccSnapshot;
+
     /**
      * Required by {@link Externalizable}
      */
@@ -223,6 +226,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
      * @param subjId Subject ID.
      * @param taskHash Task name hash code.
      * @param topVer Topology version.
+     * @param mvccSnapshot Mvcc snapshot.
      * @param addDepInfo Deployment info flag.
      */
     public GridCacheQueryRequest(
@@ -245,6 +249,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
         UUID subjId,
         int taskHash,
         AffinityTopologyVersion topVer,
+        MvccSnapshot mvccSnapshot,
         boolean addDepInfo
     ) {
         assert type != null || fields;
@@ -270,9 +275,17 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
         this.subjId = subjId;
         this.taskHash = taskHash;
         this.topVer = topVer;
+        this.mvccSnapshot = mvccSnapshot;
         this.addDepInfo = addDepInfo;
     }
 
+    /**
+     * @return Mvcc version.
+     */
+    @Nullable MvccSnapshot mvccSnapshot() {
+        return mvccSnapshot;
+    }
+
     /** {@inheritDoc} */
     @Override public AffinityTopologyVersion topologyVersion() {
         return topVer != null ? topVer : AffinityTopologyVersion.NONE;
@@ -573,48 +586,54 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeInt("pageSize", pageSize))
+                if (!writer.writeMessage("mvccSnapshot", mvccSnapshot))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeInt("part", part))
+                if (!writer.writeInt("pageSize", pageSize))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeByteArray("rdcBytes", rdcBytes))
+                if (!writer.writeInt("part", part))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeByteArray("rdcBytes", rdcBytes))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeInt("taskHash", taskHash))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeInt("taskHash", taskHash))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeByteArray("transBytes", transBytes))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 22:
+                if (!writer.writeByteArray("transBytes", transBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 23:
                 if (!writer.writeByte("type", type != null ? 
(byte)type.ordinal() : -1))
                     return false;
 
@@ -733,7 +752,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 15:
-                pageSize = reader.readInt("pageSize");
+                mvccSnapshot = reader.readMessage("mvccSnapshot");
 
                 if (!reader.isLastRead())
                     return false;
@@ -741,7 +760,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 16:
-                part = reader.readInt("part");
+                pageSize = reader.readInt("pageSize");
 
                 if (!reader.isLastRead())
                     return false;
@@ -749,7 +768,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 17:
-                rdcBytes = reader.readByteArray("rdcBytes");
+                part = reader.readInt("part");
 
                 if (!reader.isLastRead())
                     return false;
@@ -757,7 +776,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 18:
-                subjId = reader.readUuid("subjId");
+                rdcBytes = reader.readByteArray("rdcBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -765,7 +784,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 19:
-                taskHash = reader.readInt("taskHash");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -773,7 +792,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 20:
-                topVer = reader.readMessage("topVer");
+                taskHash = reader.readInt("taskHash");
 
                 if (!reader.isLastRead())
                     return false;
@@ -781,7 +800,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 21:
-                transBytes = reader.readByteArray("transBytes");
+                topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -789,6 +808,14 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 22:
+                transBytes = reader.readByteArray("transBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 23:
                 byte typeOrd;
 
                 typeOrd = reader.readByte("type");
@@ -812,7 +839,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 23;
+        return 24;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java
index 2547e1b..5dab5fd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java
@@ -106,6 +106,20 @@ public final class IgniteQueryErrorCode {
     /** Attempt to INSERT, UPDATE or MERGE value that exceed maximum column 
length. */
     public final static int TOO_LONG_VALUE = 4008;
 
+    /* 5xxx - transactions related runtime errors. */
+
+    /** Transaction is already open. */
+    public final static int TRANSACTION_EXISTS = 5001;
+
+    /** MVCC disabled. */
+    public final static int MVCC_DISABLED = 5002;
+
+    /** Transaction type mismatch (SQL/non SQL). */
+    public final static int TRANSACTION_TYPE_MISMATCH = 5003;
+
+    /** Transaction is already completed. */
+    public final static int TRANSACTION_COMPLETED = 5004;
+
     /** */
     private IgniteQueryErrorCode() {
         // No-op.
@@ -159,6 +173,12 @@ public final class IgniteQueryErrorCode {
             case KEY_UPDATE:
                 return SqlStateCode.PARSING_EXCEPTION;
 
+            case MVCC_DISABLED:
+            case TRANSACTION_EXISTS:
+            case TRANSACTION_TYPE_MISMATCH:
+            case TRANSACTION_COMPLETED:
+                return SqlStateCode.TRANSACTION_STATE_EXCEPTION;
+
             default:
                 return SqlStateCode.INTERNAL_ERROR;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
index ff10e3d..676e61c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.processors.query.NestedTxMode;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
@@ -36,6 +37,12 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery {
     /** Whether server side DML should be enabled. */
     private boolean skipReducerOnUpdate;
 
+    /** Auto commit flag. */
+    private boolean autoCommit = true;
+
+    /** Nested transactions handling mode. */
+    private NestedTxMode nestedTxMode = NestedTxMode.DEFAULT;
+
     /** Batched arguments list. */
     private List<Object[]> batchedArgs;
 
@@ -57,6 +64,8 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery {
 
         this.isQry = qry.isQry;
         this.skipReducerOnUpdate = qry.skipReducerOnUpdate;
+        this.autoCommit = qry.autoCommit;
+        this.nestedTxMode = qry.nestedTxMode;
         this.batchedArgs = qry.batchedArgs;
     }
 
@@ -159,6 +168,36 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery 
{
         return skipReducerOnUpdate;
     }
 
+    /**
+     * @return Nested transactions handling mode - behavior when the user 
attempts to open a transaction in scope of
+     * another transaction.
+     */
+    public NestedTxMode getNestedTxMode() {
+        return nestedTxMode;
+    }
+
+    /**
+     * @param nestedTxMode Nested transactions handling mode - behavior when 
the user attempts to open a transaction
+     * in scope of another transaction.
+     */
+    public void setNestedTxMode(NestedTxMode nestedTxMode) {
+        this.nestedTxMode = nestedTxMode;
+    }
+
+    /**
+     * @return Auto commit flag.
+     */
+    public boolean isAutoCommit() {
+        return autoCommit;
+    }
+
+    /**
+     * @param autoCommit Auto commit flag.
+     */
+    public void setAutoCommit(boolean autoCommit) {
+        this.autoCommit = autoCommit;
+    }
+
     /** {@inheritDoc} */
     @Override public SqlFieldsQuery copy() {
         return new SqlFieldsQueryEx(this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 4005dd8..996e7f4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -76,7 +77,6 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.resources.LoggerResource;
 import org.jetbrains.annotations.Nullable;
-import java.util.concurrent.ConcurrentHashMap;
 
 import static javax.cache.event.EventType.CREATED;
 import static javax.cache.event.EventType.EXPIRED;
@@ -481,6 +481,11 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         final boolean keepBinary,
         final boolean includeExpired) throws IgniteCheckedException
     {
+        //TODO IGNITE-7953
+        if (!cctx.atomic() && cctx.kernalContext().config().isMvccEnabled())
+            throw new UnsupportedOperationException("Continuous queries are 
not supported for transactional caches " +
+                "when MVCC is enabled.");
+
         IgniteOutClosure<CacheContinuousQueryHandler> clsr;
 
         if (rmtTransFactory != null) {
@@ -741,7 +746,8 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
             final Iterator<CacheDataRow> it = 
cctx.offheap().cacheIterator(cctx.cacheId(),
                 true,
                 true,
-                AffinityTopologyVersion.NONE);
+                AffinityTopologyVersion.NONE,
+                null);
 
             locLsnr.onUpdated(new Iterable<CacheEntryEvent>() {
                 @Override public Iterator<CacheEntryEvent> iterator() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 9e06d9d..4acf078 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -30,6 +30,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import 
org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.lang.GridTuple;
@@ -262,6 +263,11 @@ public interface IgniteInternalTx {
     public boolean activeCachesDeploymentEnabled();
 
     /**
+     * @param depEnabled Flag indicating whether deployment is enabled for 
caches from this transaction or not.
+     */
+    public void activeCachesDeploymentEnabled(boolean depEnabled);
+
+    /**
      * Attempts to set topology version and returns the current value.
      * If topology version was previously set, then it's value will
      * be returned (but not updated).
@@ -634,4 +640,14 @@ public interface IgniteInternalTx {
      * @param e Commit error.
      */
     public void commitError(Throwable e);
+
+    /**
+     * @param mvccSnapshot Mvcc snapshot.
+     */
+    public void mvccSnapshot(MvccSnapshot mvccSnapshot);
+
+    /**
+     * @return Mvcc snapshot.
+     */
+    public MvccSnapshot mvccSnapshot();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
index 25ba849..11bf219 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
@@ -31,9 +31,9 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionException;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionMetrics;
-import org.apache.ignite.transactions.TransactionException;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -48,6 +48,7 @@ public class IgniteTransactionsImpl<K, V> implements 
IgniteTransactionsEx {
 
     /**
      * @param cctx Cache shared context.
+     * @param lb Label.
      */
     public IgniteTransactionsImpl(GridCacheSharedContext<K, V> cctx, @Nullable 
String lb) {
         this.cctx = cctx;
@@ -175,6 +176,7 @@ public class IgniteTransactionsImpl<K, V> implements 
IgniteTransactionsEx {
                 isolation,
                 timeout,
                 true,
+                null,
                 txSize,
                 lb
             );

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index bdd0c53..ee5a58e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -53,6 +53,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
 import org.apache.ignite.internal.processors.cache.CacheLazyEntry;
 import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -62,6 +63,8 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
@@ -71,6 +74,7 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEnt
 import org.apache.ignite.internal.processors.cluster.BaselineTopology;
 import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.GridIntIterator;
 import org.apache.ignite.internal.util.GridSetWrapper;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
@@ -84,6 +88,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 import org.apache.ignite.transactions.TransactionConcurrency;
@@ -263,6 +268,13 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
     /** UUID to consistent id mapper. */
     protected ConsistentIdMapper consistentIdMapper;
 
+    /** Mvcc tx update snapshot. */
+    protected volatile MvccSnapshot mvccSnapshot;
+
+    /** Rollback finish future. */
+    @GridToStringExclude
+    private volatile IgniteInternalFuture rollbackFut;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -382,6 +394,18 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
     }
 
     /**
+     * @return Mvcc info.
+     */
+    @Override @Nullable public MvccSnapshot mvccSnapshot() {
+        return mvccSnapshot;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mvccSnapshot(MvccSnapshot mvccSnapshot) {
+        this.mvccSnapshot = mvccSnapshot;
+    }
+
+    /**
      * @return Shared cache context.
      */
     public GridCacheSharedContext<?, ?> context() {
@@ -687,6 +711,20 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
     }
 
     /**
+     * @return Rollback future.
+     */
+    public IgniteInternalFuture rollbackFuture() {
+        return rollbackFut;
+    }
+
+    /**
+     * @param fut Rollback future.
+     */
+    public void rollbackFuture(IgniteInternalFuture fut) {
+        rollbackFut = fut;
+    }
+
+    /**
      * Gets remaining allowed transaction time.
      *
      * @return Remaining transaction time. {@code 0} if timeout isn't 
specified. {@code -1} if time is out.
@@ -1109,9 +1147,62 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
                 if (state != ACTIVE && state != SUSPENDED)
                     seal();
 
-                if (cctx.wal() != null && cctx.tm().logTxRecords() && txNodes 
!= null) {
+                if (state == PREPARED || state == COMMITTED || state == 
ROLLED_BACK) {
+                    if (mvccSnapshot != null) {
+                        byte txState;
+
+                        switch (state) {
+                            case PREPARED:
+                                txState = TxState.PREPARED;
+                                break;
+                            case ROLLED_BACK:
+                                txState = TxState.ABORTED;
+                                break;
+                            case COMMITTED:
+                                txState = TxState.COMMITTED;
+                                break;
+                            default:
+                                throw new IllegalStateException("Illegal 
state: " + state);
+                        }
+
+                        try {
+                            if (!cctx.localNode().isClient()) {
+                                if (dht() && remote())
+                                    
cctx.coordinators().updateState(mvccSnapshot, txState, false);
+                                else if (local()) {
+                                    IgniteInternalFuture<?> rollbackFut = 
rollbackFuture();
+
+                                    boolean syncUpdate = txState == 
TxState.PREPARED || txState == TxState.COMMITTED ||
+                                        rollbackFut == null || 
rollbackFut.isDone();
+
+                                    if (syncUpdate)
+                                        
cctx.coordinators().updateState(mvccSnapshot, txState);
+                                    else {
+                                        // If tx was aborted, we need to wait 
tx log is updated on all backups.
+                                        rollbackFut.listen(new 
IgniteInClosure<IgniteInternalFuture>() {
+                                            @Override public void 
apply(IgniteInternalFuture fut) {
+                                                try {
+                                                    
cctx.coordinators().updateState(mvccSnapshot, txState);
+                                                }
+                                                catch (IgniteCheckedException 
e) {
+                                                    U.error(log, "Failed to 
log TxState: " + txState, e);
+                                                }
+                                            }
+                                        });
+                                    }
+                                }
+                            }
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to log TxState: " + txState, 
e);
+
+                            throw new IgniteException("Failed to log TxState: 
" + txState, e);
+                        }
+                    }
+
                     // Log tx state change to WAL.
-                    if (state == PREPARED || state == COMMITTED || state == 
ROLLED_BACK) {
+                    if (cctx.wal() != null && cctx.tm().logTxRecords() && 
txNodes != null) {
+
                         BaselineTopology baselineTop = 
cctx.kernalContext().state().clusterState().baselineTopology();
 
                         Map<Short, Collection<Short>> participatingNodes = 
consistentIdMapper
@@ -1569,7 +1660,8 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
                     /*closure name */recordEvt ? 
F.first(txEntry.entryProcessors()).get1() : null,
                     resolveTaskName(),
                     null,
-                    keepBinary);
+                    keepBinary,
+                    null); // TODO IGNITE-7371
             }
 
             boolean modified = false;
@@ -1757,6 +1849,32 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
     }
 
     /**
+     * Notify Dr on tx finished.
+     *
+     * @param commit {@code True} if commited, {@code False} otherwise.
+     */
+    protected void notifyDrManager(boolean commit) {
+        if (system() || internal())
+            return;
+
+        IgniteTxState txState = txState();
+
+        if (mvccSnapshot == null || txState.cacheIds().isEmpty())
+            return;
+
+        GridIntIterator iter = txState.cacheIds().iterator();
+
+        while (iter.hasNext()) {
+            int cacheId = iter.next();
+
+            GridCacheContext ctx0 = cctx.cacheContext(cacheId);
+
+            if (ctx0.isDrEnabled())
+                ctx0.dr().onTxFinished(mvccSnapshot, commit, 
topologyVersionSnapshot());
+        }
+    }
+
+    /**
      * @param e Transaction entry.
      * @param primaryOnly Flag to include backups into check or not.
      * @return {@code True} if entry is locally mapped as a primary or back up 
node.
@@ -1886,6 +2004,16 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
         return xidVer.hashCode();
     }
 
+    /**
+     * Adds cache to the list of active caches in transaction.
+     *
+     * @param cacheCtx Cache context to add.
+     * @param recovery Recovery flag. See {@link 
CacheOperationContext#setRecovery(boolean)}.
+     * @throws IgniteCheckedException If caches already enlisted in this 
transaction are not compatible with given
+     *      cache (e.g. they have different stores).
+     */
+    public abstract void addActiveCache(GridCacheContext cacheCtx, boolean 
recovery) throws IgniteCheckedException;
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return GridToStringBuilder.toString(IgniteTxAdapter.class, this,
@@ -1960,6 +2088,16 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
+        @Override public void mvccSnapshot(MvccSnapshot mvccSnapshot) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public MvccSnapshot mvccSnapshot() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
         @Override public boolean localResult() {
             return false;
         }
@@ -2040,6 +2178,11 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
+        @Override public void activeCachesDeploymentEnabled(boolean 
depEnabled) {
+            throw new IllegalStateException("Deserialized transaction can only 
be used as read-only.");
+        }
+
+        /** {@inheritDoc} */
         @Nullable @Override public Object addMeta(int key, Object val) {
             throw new IllegalStateException("Deserialized transaction can only 
be used as read-only.");
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 2eba3ac..7541b43 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -342,8 +342,6 @@ public class IgniteTxHandler {
             return new GridFinishedFuture<>(e);
         }
 
-        assert firstEntry != null : req;
-
         GridDhtTxLocal tx = null;
 
         GridCacheVersion mappedVer = ctx.tm().mappedVersion(req.version());
@@ -363,6 +361,8 @@ public class IgniteTxHandler {
             GridDhtPartitionTopology top = null;
 
             if (req.firstClientRequest()) {
+                assert firstEntry != null : req;
+
                 assert req.concurrency() == OPTIMISTIC : req;
                 assert nearNode.isClient() : nearNode;
 
@@ -658,6 +658,8 @@ public class IgniteTxHandler {
         if (expVer.equals(curVer))
             return false;
 
+        // TODO IGNITE-6754 check mvcc crd for mvcc enabled txs.
+
         for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
             GridCacheContext ctx = e.context();
 
@@ -912,8 +914,11 @@ public class IgniteTxHandler {
         else
             tx = ctx.tm().tx(dhtVer);
 
-        if (tx != null)
+        if (tx != null) {
+            tx.mvccSnapshot(req.mvccSnapshot());
+
             req.txState(tx.txState());
+        }
 
         if (tx == null && locTx != null && !req.commit()) {
             U.warn(log, "DHT local tx not found for near local tx rollback " +
@@ -1378,6 +1383,7 @@ public class IgniteTxHandler {
                 tx.commitVersion(req.commitVersion());
                 tx.invalidate(req.isInvalidate());
                 tx.systemInvalidate(req.isSystemInvalidate());
+                tx.mvccSnapshot(req.mvccSnapshot());
 
                 // Complete remote candidates.
                 tx.doneRemote(req.baseVersion(), null, null, null);
@@ -1385,11 +1391,13 @@ public class IgniteTxHandler {
                 tx.setPartitionUpdateCounters(
                     req.partUpdateCounters() != null ? 
req.partUpdateCounters().array() : null);
 
+                tx.updateCountersMap(req.updateCountersMap());
+
                 tx.commitRemoteTx();
             }
             else {
                 tx.doneRemote(req.baseVersion(), null, null, null);
-
+                tx.mvccSnapshot(req.mvccSnapshot());
                 tx.rollbackRemoteTx();
             }
         }
@@ -1424,6 +1432,7 @@ public class IgniteTxHandler {
         try {
             tx.commitVersion(req.writeVersion());
             tx.invalidate(req.isInvalidate());
+            tx.mvccSnapshot(req.mvccSnapshot());
 
             // Complete remote candidates.
             tx.doneRemote(req.version(), null, null, null);
@@ -1608,10 +1617,12 @@ public class IgniteTxHandler {
         GridDhtTxPrepareRequest req,
         GridDhtTxPrepareResponse res
     ) throws IgniteCheckedException {
-        if (!F.isEmpty(req.writes())) {
+        if (req.queryUpdate() || !F.isEmpty(req.writes())) {
             GridDhtTxRemote tx = ctx.tm().tx(req.version());
 
             if (tx == null) {
+                assert !req.queryUpdate();
+
                 boolean single = req.last() && req.writes().size() == 1;
 
                 tx = new GridDhtTxRemote(
@@ -1718,7 +1729,8 @@ public class IgniteTxHandler {
                                                 /*transformClo*/null,
                                                 tx.resolveTaskName(),
                                                 /*expiryPlc*/null,
-                                                /*keepBinary*/true);
+                                                /*keepBinary*/true,
+                                                null); // TODO IGNITE-7371
 
                                             if (val == null)
                                                 val = 
cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));
@@ -1766,7 +1778,7 @@ public class IgniteTxHandler {
 
             res.invalidPartitionsByCacheId(tx.invalidPartitions());
 
-            if (tx.empty() && req.last()) {
+            if (!req.queryUpdate() && tx.empty() && req.last()) {
                 tx.skipCompletedVersions(req.skipCompletedVersion());
 
                 tx.rollbackRemoteTx();

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index 10b06d8..4619a80 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -31,6 +31,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
+import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -57,9 +58,10 @@ public class IgniteTxImplicitSingleStateImpl extends 
IgniteTxLocalStateAdapter {
     private boolean recovery;
 
     /** {@inheritDoc} */
-    @Override public void addActiveCache(GridCacheContext ctx, boolean 
recovery, IgniteTxLocalAdapter tx)
+    @Override public void addActiveCache(GridCacheContext ctx, boolean 
recovery, IgniteTxAdapter tx)
         throws IgniteCheckedException {
         assert cacheCtx == null : "Cache already set [cur=" + cacheCtx.name() 
+ ", new=" + ctx.name() + ']';
+        assert tx.local();
 
         cacheCtx = ctx;
         this.recovery = recovery;
@@ -68,6 +70,11 @@ public class IgniteTxImplicitSingleStateImpl extends 
IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public GridIntList cacheIds() {
+        return  GridIntList.asList(cacheCtx.cacheId());
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public GridCacheContext 
singleCacheContext(GridCacheSharedContext cctx) {
         return cacheCtx;
     }
@@ -289,6 +296,13 @@ public class IgniteTxImplicitSingleStateImpl extends 
IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean mvccEnabled(GridCacheSharedContext cctx) {
+        GridCacheContext ctx0 = cacheCtx;
+
+        return ctx0 != null && ctx0.mvccEnabled();
+    }
+
+    /** {@inheritDoc} */
     public String toString() {
         return S.toString(IgniteTxImplicitSingleStateImpl.class, this);
     }

Reply via email to