http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index b9ab76a..f160549 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; +import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; @@ -467,6 +468,11 @@ public class StandaloneGridKernalContext implements GridKernalContext { } /** {@inheritDoc} */ + @Override public MvccProcessor coordinators() { + return null; + } + + /** {@inheritDoc} */ @Override public boolean invalid() { return false; }
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java index 1807d1d..65f0aae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java @@ -35,6 +35,9 @@ public final class RecordTypes { DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_INSERT_FRAGMENT_RECORD); DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_REMOVE_RECORD); DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_SET_FREE_LIST_PAGE); + DELTA_TYPE_SET.add(WALRecord.RecordType.MVCC_DATA_PAGE_MARK_UPDATED_RECORD); + DELTA_TYPE_SET.add(WALRecord.RecordType.MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD); + DELTA_TYPE_SET.add(WALRecord.RecordType.MVCC_DATA_PAGE_NEW_TX_STATE_HINT_UPDATED_RECORD); DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_INIT_ROOT); DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_ADD_ROOT); DELTA_TYPE_SET.add(WALRecord.RecordType.BTREE_META_PAGE_CUT_ROOT); http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java index ad06090..aa89c5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java @@ -41,6 +41,8 @@ import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccUpdateNewTxStateHintRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccUpdateTxStateHintRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageSetFreeListPageRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord; @@ -61,6 +63,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuc import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateLastSuccessfulSnapshotId; import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId; import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccMarkUpdatedRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.NewRootInitRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PageListMetaResetCountRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListAddPageRecord; @@ -194,6 +197,15 @@ public class RecordDataV1Serializer implements RecordDataSerializer { case DATA_PAGE_SET_FREE_LIST_PAGE: return 4 + 8 + 8; + case MVCC_DATA_PAGE_MARK_UPDATED_RECORD: + return 4 + 8 + 4 + 8 + 8 + 4; + + case MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD: + return 4 + 8 + 4 + 1; + + case MVCC_DATA_PAGE_NEW_TX_STATE_HINT_UPDATED_RECORD: + return 4 + 8 + 4 + 1; + case INIT_NEW_PAGE_RECORD: return 4 + 8 + 2 + 2 + 8; @@ -504,6 +516,41 @@ public class RecordDataV1Serializer implements RecordDataSerializer { break; + case MVCC_DATA_PAGE_MARK_UPDATED_RECORD: + cacheId = in.readInt(); + pageId = in.readLong(); + + itemId = in.readInt(); + long newMvccCrd = in.readLong(); + long newMvccCntr = in.readLong(); + int newMvccOpCntr = in.readInt(); + + res = new DataPageMvccMarkUpdatedRecord(cacheId, pageId, itemId, newMvccCrd, newMvccCntr, newMvccOpCntr); + + break; + + case MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD: + cacheId = in.readInt(); + pageId = in.readLong(); + + itemId = in.readInt(); + byte txState = in.readByte(); + + res = new DataPageMvccUpdateTxStateHintRecord(cacheId, pageId, itemId, txState); + + break; + + case MVCC_DATA_PAGE_NEW_TX_STATE_HINT_UPDATED_RECORD: + cacheId = in.readInt(); + pageId = in.readLong(); + + itemId = in.readInt(); + byte newTxState = in.readByte(); + + res = new DataPageMvccUpdateNewTxStateHintRecord(cacheId, pageId, itemId, newTxState); + + break; + case INIT_NEW_PAGE_RECORD: cacheId = in.readInt(); pageId = in.readLong(); @@ -1035,6 +1082,41 @@ public class RecordDataV1Serializer implements RecordDataSerializer { break; + case MVCC_DATA_PAGE_MARK_UPDATED_RECORD: + DataPageMvccMarkUpdatedRecord rmvRec = (DataPageMvccMarkUpdatedRecord)rec; + + buf.putInt(rmvRec.groupId()); + buf.putLong(rmvRec.pageId()); + + buf.putInt(rmvRec.itemId()); + buf.putLong(rmvRec.newMvccCrd()); + buf.putLong(rmvRec.newMvccCntr()); + buf.putInt(rmvRec.newMvccOpCntr()); + + break; + + case MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD: + DataPageMvccUpdateTxStateHintRecord txStRec = (DataPageMvccUpdateTxStateHintRecord)rec; + + buf.putInt(txStRec.groupId()); + buf.putLong(txStRec.pageId()); + + buf.putInt(txStRec.itemId()); + buf.put(txStRec.txState()); + + break; + + case MVCC_DATA_PAGE_NEW_TX_STATE_HINT_UPDATED_RECORD: + DataPageMvccUpdateNewTxStateHintRecord newTxStRec = (DataPageMvccUpdateNewTxStateHintRecord)rec; + + buf.putInt(newTxStRec.groupId()); + buf.putLong(newTxStRec.pageId()); + + buf.putInt(newTxStRec.itemId()); + buf.put(newTxStRec.txState()); + + break; + case INIT_NEW_PAGE_RECORD: InitNewPageRecord inpRec = (InitNewPageRecord)rec; http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index aac1659..c5f64c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; @@ -195,6 +196,9 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage */ @SuppressWarnings("unchecked") @Override void processQueryRequest(UUID sndId, GridCacheQueryRequest req) { + assert req.mvccSnapshot() != null || !cctx.mvccEnabled() || req.cancel() || + (req.type() == null && !req.fields()) : req; // Last assertion means next page request. + if (req.cancel()) { cancelIds.add(new CancelMessageId(req.id(), sndId)); @@ -277,7 +281,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage req.includeMetaData(), req.keepBinary(), req.subjectId(), - req.taskHash() + req.taskHash(), + req.mvccSnapshot() ); return new GridCacheQueryInfo( @@ -531,6 +536,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage String clsName = qry.query().queryClassName(); + MvccSnapshot mvccSnapshot = qry.query().mvccSnapshot(); + final GridCacheQueryRequest req = new GridCacheQueryRequest( cctx.cacheId(), reqId, @@ -551,6 +558,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage qry.query().subjectId(), qry.query().taskHash(), queryTopologyVersion(), + mvccSnapshot, // Force deployment anyway if scan query is used. cctx.deploymentEnabled() || (qry.query().scanFilter() != null && cctx.gridDeploy().enabled())); @@ -581,6 +589,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage Collection<ClusterNode> nodes) throws IgniteCheckedException { assert !cctx.isLocal() : cctx.name(); assert qry.type() == GridCacheQueryType.SCAN: qry; + assert qry.mvccSnapshot() != null || !cctx.mvccEnabled(); GridCloseableIterator locIter0 = null; @@ -606,7 +615,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, qry.<K, V>transform(), null); - final CacheQueryFuture fut = (CacheQueryFuture)queryDistributed(bean, nodes); + final CacheQueryFuture fut = queryDistributed(bean, nodes); return new GridCloseableIteratorAdapter() { /** */ @@ -749,6 +758,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage qry.query().subjectId(), qry.query().taskHash(), queryTopologyVersion(), + null, cctx.deploymentEnabled()); addQueryFuture(req.id(), fut); http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 51fdd58..f21a22f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Deque; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.NoSuchElementException; @@ -29,6 +30,7 @@ import java.util.Queue; import java.util.Set; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.Query; @@ -43,6 +45,9 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; +import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.GridEmptyCloseableIterator; @@ -60,6 +65,7 @@ import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.plugin.security.SecurityPermission; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.LOCAL; @@ -127,6 +133,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** */ private int taskHash; + /** */ + private MvccSnapshot mvccSnapshot; + /** * @param cctx Context. * @param type Query type. @@ -213,6 +222,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param keepBinary Keep binary flag. * @param subjId Security subject ID. * @param taskHash Task hash. + * @param mvccSnapshot Mvcc version. */ public GridCacheQueryAdapter(GridCacheContext<?, ?> cctx, GridCacheQueryType type, @@ -229,7 +239,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { boolean incMeta, boolean keepBinary, UUID subjId, - int taskHash) { + int taskHash, + MvccSnapshot mvccSnapshot) { this.cctx = cctx; this.type = type; this.log = log; @@ -246,6 +257,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { this.keepBinary = keepBinary; this.subjId = subjId; this.taskHash = taskHash; + this.mvccSnapshot = mvccSnapshot; + } + + /** + * @return MVCC snapshot. + */ + @Nullable MvccSnapshot mvccSnapshot() { + return mvccSnapshot; } /** @@ -400,7 +419,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { */ @SuppressWarnings("unchecked") @Nullable public <K, V> IgniteClosure<Map.Entry<K, V>, Object> transform() { - return (IgniteClosure<Map.Entry<K, V>, Object>) transform; + return (IgniteClosure<Map.Entry<K, V>, Object>)transform; } /** @@ -519,15 +538,23 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { final GridCacheQueryManager qryMgr = cctx.queries(); + MvccQueryTracker mvccTracker = null; + + if (cctx.mvccEnabled() && mvccSnapshot == null) + mvccSnapshot = (mvccTracker = MvccUtils.mvccTracker(cctx, false)).snapshot(); + boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId()); - if (loc) - return qryMgr.scanQueryLocal(this, true); + GridCloseableIterator it; - if (part != null) - return new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx); + if (loc) + it = qryMgr.scanQueryLocal(this, true); + else if (part != null) + it = new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx); else - return qryMgr.scanQueryDistributed(this, nodes); + it = qryMgr.scanQueryDistributed(this, nodes); + + return mvccTracker != null ? new MvccTrackingIterator(it, mvccTracker) : it; } /** @@ -781,7 +808,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @return Cache entry */ private Object convert(Object obj) { - if(qry.transform() != null) + if (qry.transform() != null) return obj; Map.Entry e = (Map.Entry)obj; @@ -853,4 +880,93 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { t.get2().cancel(); } } + + /** + * Wrapper for an MVCC-related iterators. + */ + private static class MvccTrackingIterator implements GridCloseableIterator { + /** Serial version uid. */ + private static final long serialVersionUID = -1905248502802333832L; + /** Underlying iterator. */ + private final GridCloseableIterator it; + + /** Query MVCC tracker. */ + private final MvccQueryTracker mvccTracker; + + /** + * Constructor. + * + * @param it Underlying iterator. + * @param mvccTracker Query MVCC tracker. + */ + MvccTrackingIterator(GridCloseableIterator it, MvccQueryTracker mvccTracker) { + assert it != null && mvccTracker != null; + + this.it = it; + this.mvccTracker = mvccTracker; + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + if (isClosed()) + return; + + try { + it.close(); + } + finally { + mvccTracker.onDone(); + } + } + + /** {@inheritDoc} */ + @Override public boolean isClosed() { + return it.isClosed(); + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + boolean hasNext = it.hasNext(); + + if (!hasNext) + try { + close(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + return hasNext; + } + + /** {@inheritDoc} */ + @Override public boolean hasNextX() throws IgniteCheckedException { + boolean hasNext = it.hasNext(); + + if (!hasNext) + close(); + + return hasNext; + } + + /** {@inheritDoc} */ + @Override public Object nextX() throws IgniteCheckedException { + return it.nextX(); + } + + /** {@inheritDoc} */ + @Override public void removeX() throws IgniteCheckedException { + it.removeX(); + } + + /** {@inheritDoc} */ + @NotNull @Override public Iterator iterator() { + return this; + } + + /** {@inheritDoc} */ + @Override public Object next() { + return it.next(); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 37bdb80..c209602 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -380,8 +380,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param prevRowAvailable Whether previous row is available. * @throws IgniteCheckedException In case of error. */ - public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow, boolean prevRowAvailable) - throws IgniteCheckedException { + public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow, + boolean prevRowAvailable) throws IgniteCheckedException { assert enabled(); assert newRow != null && newRow.value() != null && newRow.link() != 0 : newRow; @@ -414,7 +414,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param prevRow Previous row. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow) throws IgniteCheckedException { + public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow) + throws IgniteCheckedException { if (!QueryUtils.isEnabled(cctx.config())) return; // No-op. @@ -803,6 +804,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?> qry, IgniteClosure transformer, boolean locNode) throws IgniteCheckedException { + assert !cctx.mvccEnabled() || qry.mvccSnapshot() != null; + final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter(); final InternalScanFilter<K,V> intFilter = keyValFilter != null ? new InternalScanFilter<>(keyValFilter) : null; @@ -843,13 +846,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte locPart = locPart0; - it = cctx.offheap().cachePartitionIterator(cctx.cacheId(), part); + it = cctx.offheap().cachePartitionIterator(cctx.cacheId(), part, qry.mvccSnapshot()); } else { locPart = null; - // TODO shouldn't we reserve all involved partitions? - it = cctx.offheap().cacheIterator(cctx.cacheId(), true, backups, topVer); + it = cctx.offheap().cacheIterator(cctx.cacheId(), true, backups, topVer, qry.mvccSnapshot()); } return new ScanQueryIterator(it, qry, topVer, locPart, keyValFilter, transformer, locNode, cctx, log); @@ -2923,11 +2925,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte boolean locNode, GridCacheContext cctx, IgniteLogger log) { + this.it = it; this.topVer = topVer; this.locPart = locPart; this.intScanFilter = scanFilter != null ? new InternalScanFilter<>(scanFilter) : null; this.cctx = cctx; + this.log = log; this.locNode = locNode; @@ -2938,7 +2942,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ) && cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ); - if(readEvt){ + if (readEvt){ taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash()); subjId = qry.subjectId(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index 9dc7817..ebbca35 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -26,8 +26,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -129,6 +129,9 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac /** */ private AffinityTopologyVersion topVer; + /** */ + private MvccSnapshot mvccSnapshot; + /** * Required by {@link Externalizable} */ @@ -223,6 +226,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac * @param subjId Subject ID. * @param taskHash Task name hash code. * @param topVer Topology version. + * @param mvccSnapshot Mvcc snapshot. * @param addDepInfo Deployment info flag. */ public GridCacheQueryRequest( @@ -245,6 +249,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac UUID subjId, int taskHash, AffinityTopologyVersion topVer, + MvccSnapshot mvccSnapshot, boolean addDepInfo ) { assert type != null || fields; @@ -270,9 +275,17 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac this.subjId = subjId; this.taskHash = taskHash; this.topVer = topVer; + this.mvccSnapshot = mvccSnapshot; this.addDepInfo = addDepInfo; } + /** + * @return Mvcc version. + */ + @Nullable MvccSnapshot mvccSnapshot() { + return mvccSnapshot; + } + /** {@inheritDoc} */ @Override public AffinityTopologyVersion topologyVersion() { return topVer != null ? topVer : AffinityTopologyVersion.NONE; @@ -573,48 +586,54 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac writer.incrementState(); case 15: - if (!writer.writeInt("pageSize", pageSize)) + if (!writer.writeMessage("mvccSnapshot", mvccSnapshot)) return false; writer.incrementState(); case 16: - if (!writer.writeInt("part", part)) + if (!writer.writeInt("pageSize", pageSize)) return false; writer.incrementState(); case 17: - if (!writer.writeByteArray("rdcBytes", rdcBytes)) + if (!writer.writeInt("part", part)) return false; writer.incrementState(); case 18: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeByteArray("rdcBytes", rdcBytes)) return false; writer.incrementState(); case 19: - if (!writer.writeInt("taskHash", taskHash)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 20: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("taskHash", taskHash)) return false; writer.incrementState(); case 21: - if (!writer.writeByteArray("transBytes", transBytes)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 22: + if (!writer.writeByteArray("transBytes", transBytes)) + return false; + + writer.incrementState(); + + case 23: if (!writer.writeByte("type", type != null ? (byte)type.ordinal() : -1)) return false; @@ -733,7 +752,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac reader.incrementState(); case 15: - pageSize = reader.readInt("pageSize"); + mvccSnapshot = reader.readMessage("mvccSnapshot"); if (!reader.isLastRead()) return false; @@ -741,7 +760,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac reader.incrementState(); case 16: - part = reader.readInt("part"); + pageSize = reader.readInt("pageSize"); if (!reader.isLastRead()) return false; @@ -749,7 +768,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac reader.incrementState(); case 17: - rdcBytes = reader.readByteArray("rdcBytes"); + part = reader.readInt("part"); if (!reader.isLastRead()) return false; @@ -757,7 +776,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac reader.incrementState(); case 18: - subjId = reader.readUuid("subjId"); + rdcBytes = reader.readByteArray("rdcBytes"); if (!reader.isLastRead()) return false; @@ -765,7 +784,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac reader.incrementState(); case 19: - taskHash = reader.readInt("taskHash"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -773,7 +792,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac reader.incrementState(); case 20: - topVer = reader.readMessage("topVer"); + taskHash = reader.readInt("taskHash"); if (!reader.isLastRead()) return false; @@ -781,7 +800,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac reader.incrementState(); case 21: - transBytes = reader.readByteArray("transBytes"); + topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) return false; @@ -789,6 +808,14 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac reader.incrementState(); case 22: + transBytes = reader.readByteArray("transBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 23: byte typeOrd; typeOrd = reader.readByte("type"); @@ -812,7 +839,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 23; + return 24; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java index 2547e1b..5dab5fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java @@ -106,6 +106,20 @@ public final class IgniteQueryErrorCode { /** Attempt to INSERT, UPDATE or MERGE value that exceed maximum column length. */ public final static int TOO_LONG_VALUE = 4008; + /* 5xxx - transactions related runtime errors. */ + + /** Transaction is already open. */ + public final static int TRANSACTION_EXISTS = 5001; + + /** MVCC disabled. */ + public final static int MVCC_DISABLED = 5002; + + /** Transaction type mismatch (SQL/non SQL). */ + public final static int TRANSACTION_TYPE_MISMATCH = 5003; + + /** Transaction is already completed. */ + public final static int TRANSACTION_COMPLETED = 5004; + /** */ private IgniteQueryErrorCode() { // No-op. @@ -159,6 +173,12 @@ public final class IgniteQueryErrorCode { case KEY_UPDATE: return SqlStateCode.PARSING_EXCEPTION; + case MVCC_DISABLED: + case TRANSACTION_EXISTS: + case TRANSACTION_TYPE_MISMATCH: + case TRANSACTION_COMPLETED: + return SqlStateCode.TRANSACTION_STATE_EXCEPTION; + default: return SqlStateCode.INTERNAL_ERROR; } http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java index ff10e3d..676e61c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.internal.processors.query.NestedTxMode; import org.apache.ignite.internal.util.typedef.F; /** @@ -36,6 +37,12 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery { /** Whether server side DML should be enabled. */ private boolean skipReducerOnUpdate; + /** Auto commit flag. */ + private boolean autoCommit = true; + + /** Nested transactions handling mode. */ + private NestedTxMode nestedTxMode = NestedTxMode.DEFAULT; + /** Batched arguments list. */ private List<Object[]> batchedArgs; @@ -57,6 +64,8 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery { this.isQry = qry.isQry; this.skipReducerOnUpdate = qry.skipReducerOnUpdate; + this.autoCommit = qry.autoCommit; + this.nestedTxMode = qry.nestedTxMode; this.batchedArgs = qry.batchedArgs; } @@ -159,6 +168,36 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery { return skipReducerOnUpdate; } + /** + * @return Nested transactions handling mode - behavior when the user attempts to open a transaction in scope of + * another transaction. + */ + public NestedTxMode getNestedTxMode() { + return nestedTxMode; + } + + /** + * @param nestedTxMode Nested transactions handling mode - behavior when the user attempts to open a transaction + * in scope of another transaction. + */ + public void setNestedTxMode(NestedTxMode nestedTxMode) { + this.nestedTxMode = nestedTxMode; + } + + /** + * @return Auto commit flag. + */ + public boolean isAutoCommit() { + return autoCommit; + } + + /** + * @param autoCommit Auto commit flag. + */ + public void setAutoCommit(boolean autoCommit) { + this.autoCommit = autoCommit; + } + /** {@inheritDoc} */ @Override public SqlFieldsQuery copy() { return new SqlFieldsQueryEx(this); http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 4005dd8..996e7f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.Map; import java.util.NoSuchElementException; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -76,7 +77,6 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.resources.LoggerResource; import org.jetbrains.annotations.Nullable; -import java.util.concurrent.ConcurrentHashMap; import static javax.cache.event.EventType.CREATED; import static javax.cache.event.EventType.EXPIRED; @@ -481,6 +481,11 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { final boolean keepBinary, final boolean includeExpired) throws IgniteCheckedException { + //TODO IGNITE-7953 + if (!cctx.atomic() && cctx.kernalContext().config().isMvccEnabled()) + throw new UnsupportedOperationException("Continuous queries are not supported for transactional caches " + + "when MVCC is enabled."); + IgniteOutClosure<CacheContinuousQueryHandler> clsr; if (rmtTransFactory != null) { @@ -741,7 +746,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { final Iterator<CacheDataRow> it = cctx.offheap().cacheIterator(cctx.cacheId(), true, true, - AffinityTopologyVersion.NONE); + AffinityTopologyVersion.NONE, + null); locLsnr.onUpdated(new Iterable<CacheEntryEvent>() { @Override public Iterator<CacheEntryEvent> iterator() { http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 9e06d9d..4acf078 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.lang.GridTuple; @@ -262,6 +263,11 @@ public interface IgniteInternalTx { public boolean activeCachesDeploymentEnabled(); /** + * @param depEnabled Flag indicating whether deployment is enabled for caches from this transaction or not. + */ + public void activeCachesDeploymentEnabled(boolean depEnabled); + + /** * Attempts to set topology version and returns the current value. * If topology version was previously set, then it's value will * be returned (but not updated). @@ -634,4 +640,14 @@ public interface IgniteInternalTx { * @param e Commit error. */ public void commitError(Throwable e); + + /** + * @param mvccSnapshot Mvcc snapshot. + */ + public void mvccSnapshot(MvccSnapshot mvccSnapshot); + + /** + * @return Mvcc snapshot. + */ + public MvccSnapshot mvccSnapshot(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java index 25ba849..11bf219 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -31,9 +31,9 @@ import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionException; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionMetrics; -import org.apache.ignite.transactions.TransactionException; import org.jetbrains.annotations.Nullable; /** @@ -48,6 +48,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { /** * @param cctx Cache shared context. + * @param lb Label. */ public IgniteTransactionsImpl(GridCacheSharedContext<K, V> cctx, @Nullable String lb) { this.cctx = cctx; @@ -175,6 +176,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { isolation, timeout, true, + null, txSize, lb ); http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index bdd0c53..ee5a58e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; import org.apache.ignite.internal.processors.cache.CacheLazyEntry; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; @@ -62,6 +63,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; +import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry; @@ -71,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEnt import org.apache.ignite.internal.processors.cluster.BaselineTopology; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; +import org.apache.ignite.internal.util.GridIntIterator; import org.apache.ignite.internal.util.GridSetWrapper; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; @@ -84,6 +88,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.transactions.TransactionConcurrency; @@ -263,6 +268,13 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement /** UUID to consistent id mapper. */ protected ConsistentIdMapper consistentIdMapper; + /** Mvcc tx update snapshot. */ + protected volatile MvccSnapshot mvccSnapshot; + + /** Rollback finish future. */ + @GridToStringExclude + private volatile IgniteInternalFuture rollbackFut; + /** * Empty constructor required for {@link Externalizable}. */ @@ -382,6 +394,18 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** + * @return Mvcc info. + */ + @Override @Nullable public MvccSnapshot mvccSnapshot() { + return mvccSnapshot; + } + + /** {@inheritDoc} */ + @Override public void mvccSnapshot(MvccSnapshot mvccSnapshot) { + this.mvccSnapshot = mvccSnapshot; + } + + /** * @return Shared cache context. */ public GridCacheSharedContext<?, ?> context() { @@ -687,6 +711,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** + * @return Rollback future. + */ + public IgniteInternalFuture rollbackFuture() { + return rollbackFut; + } + + /** + * @param fut Rollback future. + */ + public void rollbackFuture(IgniteInternalFuture fut) { + rollbackFut = fut; + } + + /** * Gets remaining allowed transaction time. * * @return Remaining transaction time. {@code 0} if timeout isn't specified. {@code -1} if time is out. @@ -1109,9 +1147,62 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement if (state != ACTIVE && state != SUSPENDED) seal(); - if (cctx.wal() != null && cctx.tm().logTxRecords() && txNodes != null) { + if (state == PREPARED || state == COMMITTED || state == ROLLED_BACK) { + if (mvccSnapshot != null) { + byte txState; + + switch (state) { + case PREPARED: + txState = TxState.PREPARED; + break; + case ROLLED_BACK: + txState = TxState.ABORTED; + break; + case COMMITTED: + txState = TxState.COMMITTED; + break; + default: + throw new IllegalStateException("Illegal state: " + state); + } + + try { + if (!cctx.localNode().isClient()) { + if (dht() && remote()) + cctx.coordinators().updateState(mvccSnapshot, txState, false); + else if (local()) { + IgniteInternalFuture<?> rollbackFut = rollbackFuture(); + + boolean syncUpdate = txState == TxState.PREPARED || txState == TxState.COMMITTED || + rollbackFut == null || rollbackFut.isDone(); + + if (syncUpdate) + cctx.coordinators().updateState(mvccSnapshot, txState); + else { + // If tx was aborted, we need to wait tx log is updated on all backups. + rollbackFut.listen(new IgniteInClosure<IgniteInternalFuture>() { + @Override public void apply(IgniteInternalFuture fut) { + try { + cctx.coordinators().updateState(mvccSnapshot, txState); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to log TxState: " + txState, e); + } + } + }); + } + } + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to log TxState: " + txState, e); + + throw new IgniteException("Failed to log TxState: " + txState, e); + } + } + // Log tx state change to WAL. - if (state == PREPARED || state == COMMITTED || state == ROLLED_BACK) { + if (cctx.wal() != null && cctx.tm().logTxRecords() && txNodes != null) { + BaselineTopology baselineTop = cctx.kernalContext().state().clusterState().baselineTopology(); Map<Short, Collection<Short>> participatingNodes = consistentIdMapper @@ -1569,7 +1660,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement /*closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null, resolveTaskName(), null, - keepBinary); + keepBinary, + null); // TODO IGNITE-7371 } boolean modified = false; @@ -1757,6 +1849,32 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** + * Notify Dr on tx finished. + * + * @param commit {@code True} if commited, {@code False} otherwise. + */ + protected void notifyDrManager(boolean commit) { + if (system() || internal()) + return; + + IgniteTxState txState = txState(); + + if (mvccSnapshot == null || txState.cacheIds().isEmpty()) + return; + + GridIntIterator iter = txState.cacheIds().iterator(); + + while (iter.hasNext()) { + int cacheId = iter.next(); + + GridCacheContext ctx0 = cctx.cacheContext(cacheId); + + if (ctx0.isDrEnabled()) + ctx0.dr().onTxFinished(mvccSnapshot, commit, topologyVersionSnapshot()); + } + } + + /** * @param e Transaction entry. * @param primaryOnly Flag to include backups into check or not. * @return {@code True} if entry is locally mapped as a primary or back up node. @@ -1886,6 +2004,16 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement return xidVer.hashCode(); } + /** + * Adds cache to the list of active caches in transaction. + * + * @param cacheCtx Cache context to add. + * @param recovery Recovery flag. See {@link CacheOperationContext#setRecovery(boolean)}. + * @throws IgniteCheckedException If caches already enlisted in this transaction are not compatible with given + * cache (e.g. they have different stores). + */ + public abstract void addActiveCache(GridCacheContext cacheCtx, boolean recovery) throws IgniteCheckedException; + /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(IgniteTxAdapter.class, this, @@ -1960,6 +2088,16 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ + @Override public void mvccSnapshot(MvccSnapshot mvccSnapshot) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public MvccSnapshot mvccSnapshot() { + return null; + } + + /** {@inheritDoc} */ @Override public boolean localResult() { return false; } @@ -2040,6 +2178,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ + @Override public void activeCachesDeploymentEnabled(boolean depEnabled) { + throw new IllegalStateException("Deserialized transaction can only be used as read-only."); + } + + /** {@inheritDoc} */ @Nullable @Override public Object addMeta(int key, Object val) { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); } http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 2eba3ac..7541b43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -342,8 +342,6 @@ public class IgniteTxHandler { return new GridFinishedFuture<>(e); } - assert firstEntry != null : req; - GridDhtTxLocal tx = null; GridCacheVersion mappedVer = ctx.tm().mappedVersion(req.version()); @@ -363,6 +361,8 @@ public class IgniteTxHandler { GridDhtPartitionTopology top = null; if (req.firstClientRequest()) { + assert firstEntry != null : req; + assert req.concurrency() == OPTIMISTIC : req; assert nearNode.isClient() : nearNode; @@ -658,6 +658,8 @@ public class IgniteTxHandler { if (expVer.equals(curVer)) return false; + // TODO IGNITE-6754 check mvcc crd for mvcc enabled txs. + for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) { GridCacheContext ctx = e.context(); @@ -912,8 +914,11 @@ public class IgniteTxHandler { else tx = ctx.tm().tx(dhtVer); - if (tx != null) + if (tx != null) { + tx.mvccSnapshot(req.mvccSnapshot()); + req.txState(tx.txState()); + } if (tx == null && locTx != null && !req.commit()) { U.warn(log, "DHT local tx not found for near local tx rollback " + @@ -1378,6 +1383,7 @@ public class IgniteTxHandler { tx.commitVersion(req.commitVersion()); tx.invalidate(req.isInvalidate()); tx.systemInvalidate(req.isSystemInvalidate()); + tx.mvccSnapshot(req.mvccSnapshot()); // Complete remote candidates. tx.doneRemote(req.baseVersion(), null, null, null); @@ -1385,11 +1391,13 @@ public class IgniteTxHandler { tx.setPartitionUpdateCounters( req.partUpdateCounters() != null ? req.partUpdateCounters().array() : null); + tx.updateCountersMap(req.updateCountersMap()); + tx.commitRemoteTx(); } else { tx.doneRemote(req.baseVersion(), null, null, null); - + tx.mvccSnapshot(req.mvccSnapshot()); tx.rollbackRemoteTx(); } } @@ -1424,6 +1432,7 @@ public class IgniteTxHandler { try { tx.commitVersion(req.writeVersion()); tx.invalidate(req.isInvalidate()); + tx.mvccSnapshot(req.mvccSnapshot()); // Complete remote candidates. tx.doneRemote(req.version(), null, null, null); @@ -1608,10 +1617,12 @@ public class IgniteTxHandler { GridDhtTxPrepareRequest req, GridDhtTxPrepareResponse res ) throws IgniteCheckedException { - if (!F.isEmpty(req.writes())) { + if (req.queryUpdate() || !F.isEmpty(req.writes())) { GridDhtTxRemote tx = ctx.tm().tx(req.version()); if (tx == null) { + assert !req.queryUpdate(); + boolean single = req.last() && req.writes().size() == 1; tx = new GridDhtTxRemote( @@ -1718,7 +1729,8 @@ public class IgniteTxHandler { /*transformClo*/null, tx.resolveTaskName(), /*expiryPlc*/null, - /*keepBinary*/true); + /*keepBinary*/true, + null); // TODO IGNITE-7371 if (val == null) val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key())); @@ -1766,7 +1778,7 @@ public class IgniteTxHandler { res.invalidPartitionsByCacheId(tx.invalidPartitions()); - if (tx.empty() && req.last()) { + if (!req.queryUpdate() && tx.empty() && req.last()) { tx.skipCompletedVersions(req.skipCompletedVersion()); tx.rollbackRemoteTx(); http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java index 10b06d8..4619a80 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; +import org.apache.ignite.internal.util.GridIntList; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -57,9 +58,10 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter { private boolean recovery; /** {@inheritDoc} */ - @Override public void addActiveCache(GridCacheContext ctx, boolean recovery, IgniteTxLocalAdapter tx) + @Override public void addActiveCache(GridCacheContext ctx, boolean recovery, IgniteTxAdapter tx) throws IgniteCheckedException { assert cacheCtx == null : "Cache already set [cur=" + cacheCtx.name() + ", new=" + ctx.name() + ']'; + assert tx.local(); cacheCtx = ctx; this.recovery = recovery; @@ -68,6 +70,11 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter { } /** {@inheritDoc} */ + @Nullable @Override public GridIntList cacheIds() { + return GridIntList.asList(cacheCtx.cacheId()); + } + + /** {@inheritDoc} */ @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) { return cacheCtx; } @@ -289,6 +296,13 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter { } /** {@inheritDoc} */ + @Override public boolean mvccEnabled(GridCacheSharedContext cctx) { + GridCacheContext ctx0 = cacheCtx; + + return ctx0 != null && ctx0.mvccEnabled(); + } + + /** {@inheritDoc} */ public String toString() { return S.toString(IgniteTxImplicitSingleStateImpl.class, this); }