Fixing metadata update under changing topology.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6ef7fb60 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6ef7fb60 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6ef7fb60 Branch: refs/heads/ignite-1282 Commit: 6ef7fb604f6e56e9ea72d3ef6d6ce199564c1c41 Parents: ee5d9fd Author: Alexey Goncharuk <alexey.goncha...@gmail.com> Authored: Sat Nov 28 15:56:22 2015 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Sat Nov 28 15:56:22 2015 +0300 ---------------------------------------------------------------------- .../internal/portable/BinaryEnumObjectImpl.java | 2 +- .../internal/portable/BinaryReaderExImpl.java | 6 +- .../internal/portable/BinaryWriterExImpl.java | 10 +- .../internal/portable/PortableContext.java | 31 ++- .../ignite/internal/portable/PortableUtils.java | 10 +- .../builder/BinaryObjectBuilderImpl.java | 2 +- .../portable/builder/PortableBuilderEnum.java | 2 +- .../builder/PortableEnumArrayLazyValue.java | 2 +- .../builder/PortableObjectArrayLazyValue.java | 2 +- .../colocated/GridDhtColocatedLockFuture.java | 17 +- .../distributed/near/GridNearLockFuture.java | 17 +- ...arOptimisticSerializableTxPrepareFuture.java | 12 +- .../near/GridNearOptimisticTxPrepareFuture.java | 134 +++++----- .../cache/transactions/IgniteTxManager.java | 22 +- .../portable/BinaryMarshallerSelfTest.java | 2 +- ...yMetadataUpdateChangingTopologySelfTest.java | 245 +++++++++++++++++++ .../replicated/GridReplicatedTxPreloadTest.java | 6 + .../IgnitePortableObjectsTestSuite.java | 2 + 18 files changed, 425 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryEnumObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryEnumObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryEnumObjectImpl.java index e13c076..467d767 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryEnumObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryEnumObjectImpl.java @@ -110,7 +110,7 @@ public class BinaryEnumObjectImpl implements BinaryObjectEx, Externalizable, Cac /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <T> T deserialize() throws BinaryObjectException { - Class cls = PortableUtils.resolveClass(ctx, typeId, clsName, null); + Class cls = PortableUtils.resolveClass(ctx, typeId, clsName, null, true); return BinaryEnumCache.get(cls, ord); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java index 3cc2fbe..b9f851c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java @@ -234,7 +234,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina int off = in.position(); // Registers class by type ID, at least locally if the cache is not ready yet. - typeId = ctx.descriptorForClass(PortableUtils.doReadClass(in, ctx, ldr, typeId0)).typeId(); + typeId = ctx.descriptorForClass(PortableUtils.doReadClass(in, ctx, ldr, typeId0), false).typeId(); int clsNameLen = in.position() - off; @@ -277,7 +277,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina * @return Descriptor. */ PortableClassDescriptor descriptor() { - return ctx.descriptorForTypeId(userType, typeId, ldr); + return ctx.descriptorForTypeId(userType, typeId, ldr, true); } /** @@ -1427,7 +1427,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina break; case OBJ: - PortableClassDescriptor desc = ctx.descriptorForTypeId(userType, typeId, ldr); + PortableClassDescriptor desc = ctx.descriptorForTypeId(userType, typeId, ldr, true); streamPosition(dataStart); http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryWriterExImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryWriterExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryWriterExImpl.java index 9d1d037..95807ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryWriterExImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryWriterExImpl.java @@ -182,7 +182,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje Class<?> cls = obj.getClass(); - PortableClassDescriptor desc = ctx.descriptorForClass(cls); + PortableClassDescriptor desc = ctx.descriptorForClass(cls, false); if (desc == null) throw new BinaryObjectException("Object is not portable: [class=" + cls + ']'); @@ -697,7 +697,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje if (tryWriteAsHandle(val)) return; - PortableClassDescriptor desc = ctx.descriptorForClass(val.getClass().getComponentType()); + PortableClassDescriptor desc = ctx.descriptorForClass(val.getClass().getComponentType(), false); out.unsafeEnsure(1 + 4); out.unsafeWriteByte(OBJ_ARR); @@ -785,7 +785,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje if (val == null) out.writeByte(NULL); else { - PortableClassDescriptor desc = ctx.descriptorForClass(val.getClass()); + PortableClassDescriptor desc = ctx.descriptorForClass(val.getClass(), false); out.unsafeEnsure(1 + 4); @@ -830,7 +830,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje if (val == null) out.writeByte(NULL); else { - PortableClassDescriptor desc = ctx.descriptorForClass(val.getClass().getComponentType()); + PortableClassDescriptor desc = ctx.descriptorForClass(val.getClass().getComponentType(), false); out.unsafeEnsure(1 + 4); @@ -859,7 +859,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje if (val == null) out.writeByte(NULL); else { - PortableClassDescriptor desc = ctx.descriptorForClass(val); + PortableClassDescriptor desc = ctx.descriptorForClass(val, false); out.unsafeEnsure(1 + 4); http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java index ae0d940..1482df9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java @@ -402,14 +402,14 @@ public class PortableContext implements Externalizable { * @return Class descriptor. * @throws org.apache.ignite.binary.BinaryObjectException In case of error. */ - public PortableClassDescriptor descriptorForClass(Class<?> cls) + public PortableClassDescriptor descriptorForClass(Class<?> cls, boolean deserialize) throws BinaryObjectException { assert cls != null; PortableClassDescriptor desc = descByCls.get(cls); if (desc == null || !desc.registered()) - desc = registerClassDescriptor(cls); + desc = registerClassDescriptor(cls, deserialize); return desc; } @@ -420,7 +420,12 @@ public class PortableContext implements Externalizable { * @param ldr Class loader. * @return Class descriptor. */ - public PortableClassDescriptor descriptorForTypeId(boolean userType, int typeId, ClassLoader ldr) { + public PortableClassDescriptor descriptorForTypeId( + boolean userType, + int typeId, + ClassLoader ldr, + boolean deserialize + ) { assert typeId != GridPortableMarshaller.UNREGISTERED_TYPE_ID; //TODO: As a workaround for IGNITE-1358 we always check the predefined map before without checking 'userType' @@ -450,21 +455,21 @@ public class PortableContext implements Externalizable { } catch (ClassNotFoundException e) { // Class might have been loaded by default class loader. - if (userType && !ldr.equals(dfltLdr) && (desc = descriptorForTypeId(true, typeId, dfltLdr)) != null) + if (userType && !ldr.equals(dfltLdr) && (desc = descriptorForTypeId(true, typeId, dfltLdr, deserialize)) != null) return desc; throw new BinaryInvalidTypeException(e); } catch (IgniteCheckedException e) { // Class might have been loaded by default class loader. - if (userType && !ldr.equals(dfltLdr) && (desc = descriptorForTypeId(true, typeId, dfltLdr)) != null) + if (userType && !ldr.equals(dfltLdr) && (desc = descriptorForTypeId(true, typeId, dfltLdr, deserialize)) != null) return desc; throw new BinaryObjectException("Failed resolve class for ID: " + typeId, e); } if (desc == null) { - desc = registerClassDescriptor(cls); + desc = registerClassDescriptor(cls, deserialize); assert desc.typeId() == typeId; } @@ -478,7 +483,7 @@ public class PortableContext implements Externalizable { * @param cls Class. * @return Class descriptor. */ - private PortableClassDescriptor registerClassDescriptor(Class<?> cls) { + private PortableClassDescriptor registerClassDescriptor(Class<?> cls, boolean deserialize) { PortableClassDescriptor desc; String clsName = cls.getName(); @@ -503,7 +508,7 @@ public class PortableContext implements Externalizable { desc = old; } else - desc = registerUserClassDescriptor(cls); + desc = registerUserClassDescriptor(cls, deserialize); return desc; } @@ -514,7 +519,7 @@ public class PortableContext implements Externalizable { * @param cls Class. * @return Class descriptor. */ - private PortableClassDescriptor registerUserClassDescriptor(Class<?> cls) { + private PortableClassDescriptor registerUserClassDescriptor(Class<?> cls, boolean deserialize) { boolean registered; String typeName = typeName(cls.getName()); @@ -545,10 +550,12 @@ public class PortableContext implements Externalizable { false /* predefined */ ); - Collection<PortableSchema> schemas = desc.schema() != null ? Collections.singleton(desc.schema()) : null; + if (!deserialize) { + Collection<PortableSchema> schemas = desc.schema() != null ? Collections.singleton(desc.schema()) : null; - metaHnd.addMeta(typeId, - new BinaryMetadata(typeId, typeName, desc.fieldsMeta(), affFieldName, schemas, desc.isEnum()).wrap(this)); + metaHnd.addMeta(typeId, + new BinaryMetadata(typeId, typeName, desc.fieldsMeta(), affFieldName, schemas, desc.isEnum()).wrap(this)); + } // perform put() instead of putIfAbsent() because "registered" flag might have been changed or class loader // might have reloaded described class. http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java index 1a8f156..5d794ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java @@ -1366,7 +1366,7 @@ public class PortableUtils { return Object.class; if (typeId != UNREGISTERED_TYPE_ID) - cls = ctx.descriptorForTypeId(true, typeId, ldr).describedClass(); + cls = ctx.descriptorForTypeId(true, typeId, ldr, false).describedClass(); else { String clsName = doReadClassName(in); @@ -1378,7 +1378,7 @@ public class PortableUtils { } // forces registering of class by type id, at least locally - ctx.descriptorForClass(cls); + ctx.descriptorForClass(cls, true); } return cls; @@ -1394,14 +1394,14 @@ public class PortableUtils { * @return Resovled class. */ public static Class resolveClass(PortableContext ctx, int typeId, @Nullable String clsName, - @Nullable ClassLoader ldr) { + @Nullable ClassLoader ldr, boolean deserialize) { Class cls; if (typeId == OBJECT_TYPE_ID) return Object.class; if (typeId != UNREGISTERED_TYPE_ID) - cls = ctx.descriptorForTypeId(true, typeId, ldr).describedClass(); + cls = ctx.descriptorForTypeId(true, typeId, ldr, deserialize).describedClass(); else { try { cls = U.forName(clsName, ldr); @@ -1411,7 +1411,7 @@ public class PortableUtils { } // forces registering of class by type id, at least locally - ctx.descriptorForClass(cls); + ctx.descriptorForClass(cls, true); } return cls; http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/BinaryObjectBuilderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/BinaryObjectBuilderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/BinaryObjectBuilderImpl.java index 5c2c713..caf3b88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/BinaryObjectBuilderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/BinaryObjectBuilderImpl.java @@ -161,7 +161,7 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder { throw new BinaryInvalidTypeException("Failed to load the class: " + clsNameToWrite, e); } - this.typeId = ctx.descriptorForClass(cls).typeId(); + this.typeId = ctx.descriptorForClass(cls, false).typeId(); registeredType = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderEnum.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderEnum.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderEnum.java index 9eb77b4..6f79e73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderEnum.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderEnum.java @@ -64,7 +64,7 @@ public class PortableBuilderEnum implements PortableBuilderSerializationAware { throw new BinaryInvalidTypeException("Failed to load the class: " + clsName, e); } - this.typeId = reader.portableContext().descriptorForClass(cls).typeId(); + this.typeId = reader.portableContext().descriptorForClass(cls, false).typeId(); } else { this.typeId = typeId; http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableEnumArrayLazyValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableEnumArrayLazyValue.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableEnumArrayLazyValue.java index 1e2ebc9..91c1c87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableEnumArrayLazyValue.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableEnumArrayLazyValue.java @@ -57,7 +57,7 @@ class PortableEnumArrayLazyValue extends PortableAbstractLazyValue { throw new BinaryInvalidTypeException("Failed to load the class: " + clsName, e); } - compTypeId = reader.portableContext().descriptorForClass(cls).typeId(); + compTypeId = reader.portableContext().descriptorForClass(cls, true).typeId(); } else { compTypeId = typeId; http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableObjectArrayLazyValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableObjectArrayLazyValue.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableObjectArrayLazyValue.java index 6634eea..8c8022b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableObjectArrayLazyValue.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableObjectArrayLazyValue.java @@ -56,7 +56,7 @@ class PortableObjectArrayLazyValue extends PortableAbstractLazyValue { throw new BinaryInvalidTypeException("Failed to load the class: " + clsName, e); } - compTypeId = reader.portableContext().descriptorForClass(cls).typeId(); + compTypeId = reader.portableContext().descriptorForClass(cls, true).typeId(); } else { compTypeId = typeId; http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index ecdf641..d3028ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -977,12 +977,27 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture } /** + * @throws IgniteCheckedException If failed. + */ + private void proceedMapping() throws IgniteCheckedException { + boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx); + + try { + proceedMapping0(); + } + finally { + if (set) + cctx.tm().setTxTopologyHint(null); + } + } + + /** * Gets next near lock mapping and either acquires dht locks locally or sends near lock request to * remote primary node. * * @throws IgniteCheckedException If mapping can not be completed. */ - private void proceedMapping() + private void proceedMapping0() throws IgniteCheckedException { GridNearLockMapping map; http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index f1f9990..417303b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -1082,13 +1082,28 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean } /** + * @throws IgniteCheckedException If failed. + */ + private void proceedMapping() throws IgniteCheckedException { + boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx); + + try { + proceedMapping0(); + } + finally { + if (set) + cctx.tm().setTxTopologyHint(null); + } + } + + /** * Gets next near lock mapping and either acquires dht locks locally or sends near lock request to * remote primary node. * * @throws IgniteCheckedException If mapping can not be completed. */ @SuppressWarnings("unchecked") - private void proceedMapping() + private void proceedMapping0() throws IgniteCheckedException { GridNearLockMapping map; http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 916c7ab..f52b3fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -304,9 +304,17 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim return; } - prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked); + boolean set = cctx.tm().setTxTopologyHint(tx); - markInitialized(); + try { + prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked); + + markInitialized(); + } + finally { + if (set) + cctx.tm().setTxTopologyHint(null); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index ca1d36c..2ce14af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -405,82 +405,90 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa if (isDone()) return; - assert !m.empty(); - - final ClusterNode n = m.node(); - - GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( - futId, - tx.topologyVersion(), - tx, - null, - m.writes(), - m.near(), - txMapping.transactionNodes(), - m.last(), - tx.onePhaseCommit(), - tx.needReturnValue() && tx.implicit(), - tx.implicitSingle(), - m.explicitLock(), - tx.subjectId(), - tx.taskNameHash(), - m.clientFirst(), - tx.activeCachesDeploymentEnabled()); - - for (IgniteTxEntry txEntry : m.writes()) { - if (txEntry.op() == TRANSFORM) - req.addDhtVersion(txEntry.txKey(), null); - } - - // Must lock near entries separately. - if (m.near()) { - try { - tx.optimisticLockEntries(req.writes()); + boolean set = cctx.tm().setTxTopologyHint(tx); - tx.userPrepare(); + try { + assert !m.empty(); + + final ClusterNode n = m.node(); + + GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( + futId, + tx.topologyVersion(), + tx, + null, + m.writes(), + m.near(), + txMapping.transactionNodes(), + m.last(), + tx.onePhaseCommit(), + tx.needReturnValue() && tx.implicit(), + tx.implicitSingle(), + m.explicitLock(), + tx.subjectId(), + tx.taskNameHash(), + m.clientFirst(), + tx.activeCachesDeploymentEnabled()); + + for (IgniteTxEntry txEntry : m.writes()) { + if (txEntry.op() == TRANSFORM) + req.addDhtVersion(txEntry.txKey(), null); } - catch (IgniteCheckedException e) { - onError(e); + + // Must lock near entries separately. + if (m.near()) { + try { + tx.optimisticLockEntries(req.writes()); + + tx.userPrepare(); + } + catch (IgniteCheckedException e) { + onError(e); + } } - } - final MiniFuture fut = new MiniFuture(m, mappings); + final MiniFuture fut = new MiniFuture(m, mappings); - req.miniId(fut.futureId()); + req.miniId(fut.futureId()); - add(fut); // Append new future. + add(fut); // Append new future. - // If this is the primary node for the keys. - if (n.isLocal()) { - // At this point, if any new node joined, then it is - // waiting for this transaction to complete, so - // partition reassignments are not possible here. - IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req); + // If this is the primary node for the keys. + if (n.isLocal()) { + // At this point, if any new node joined, then it is + // waiting for this transaction to complete, so + // partition reassignments are not possible here. + IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req); - prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { - @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) { - try { - fut.onResult(n.id(), prepFut.get()); - } - catch (IgniteCheckedException e) { - fut.onResult(e); + prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { + @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) { + try { + fut.onResult(n.id(), prepFut.get()); + } + catch (IgniteCheckedException e) { + fut.onResult(e); + } } - } - }); - } - else { - try { - cctx.io().send(n, req, tx.ioPolicy()); + }); } - catch (ClusterTopologyCheckedException e) { - e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); + else { + try { + cctx.io().send(n, req, tx.ioPolicy()); + } + catch (ClusterTopologyCheckedException e) { + e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); - fut.onResult(e); - } - catch (IgniteCheckedException e) { - fut.onResult(e); + fut.onResult(e); + } + catch (IgniteCheckedException e) { + fut.onResult(e); + } } } + finally { + if (set) + cctx.tm().setTxTopologyHint(null); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 421b0e6..243c4cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -114,6 +114,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** Committing transactions. */ private final ThreadLocal<IgniteInternalTx> threadCtx = new ThreadLocal<>(); + /** Transaction which topology version should be used when mapping internal tx. */ + private final ThreadLocal<IgniteInternalTx> txTopology = new ThreadLocal<>(); + /** Per-thread transaction map. */ private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap(); @@ -622,7 +625,24 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return tx; } - return null; + return txTopology.get(); + } + + /** + * @param tx Transaction. + */ + public boolean setTxTopologyHint(IgniteInternalTx tx) { + if (tx == null) + txTopology.remove(); + else { + if (txTopology.get() == null) { + txTopology.set(tx); + + return true; + } + } + + return false; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/test/java/org/apache/ignite/internal/portable/BinaryMarshallerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/portable/BinaryMarshallerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/portable/BinaryMarshallerSelfTest.java index ad0dcf2..cc035f6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/portable/BinaryMarshallerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/portable/BinaryMarshallerSelfTest.java @@ -2105,7 +2105,7 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest { if (id == GridPortableMarshaller.UNREGISTERED_TYPE_ID) continue; - PortableClassDescriptor desc = pCtx.descriptorForTypeId(false, entry.getValue(), null); + PortableClassDescriptor desc = pCtx.descriptorForTypeId(false, entry.getValue(), null, false); assertEquals(desc.typeId(), pCtx.typeId(desc.describedClass().getName())); assertEquals(desc.typeId(), pCtx.typeId(pCtx.typeName(desc.describedClass().getName()))); http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java new file mode 100644 index 0000000..e53650c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; + +/** + * Tests specific scenario when binary metadata should be updated from a system thread + * and topology has been already changed since the original transaction start. + */ +public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(null); + + CacheConfiguration ccfg = new CacheConfiguration("cache"); + + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setBackups(1); + + cfg.setCacheConfiguration(ccfg); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setCommunicationSpi(new TestCommunicationSpi()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(4); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testNoDeadlockOptimistic() throws Exception { + int key1 = primaryKey(ignite(1).cache("cache")); + int key2 = primaryKey(ignite(2).cache("cache")); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite(1).configuration().getCommunicationSpi(); + + spi.blockMessages(GridNearTxPrepareResponse.class, ignite(0).cluster().localNode().id()); + + IgniteCache<Object, Object> cache = ignite(0).cache("cache").withAsync(); + + cache.putAll(F.asMap(key1, "val1", key2, new TestValue())); + + try { + Thread.sleep(500); + + IgniteInternalFuture<Void> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(4); + + return null; + } + }); + + Thread.sleep(500); + + spi.stopBlock(); + + cache.future().get(); + + fut.get(); + } + finally { + stopGrid(4); + } + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** */ + @LoggerResource + private IgniteLogger log; + + /** */ + private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>(); + + /** */ + private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>(); + + /** */ + private Class<?> recordCls; + + /** */ + private List<Object> recordedMsgs = new ArrayList<>(); + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + throws IgniteSpiException { + if (msg instanceof GridIoMessage) { + Object msg0 = ((GridIoMessage)msg).message(); + + synchronized (this) { + if (recordCls != null && msg0.getClass().equals(recordCls)) + recordedMsgs.add(msg0); + + Set<UUID> blockNodes = blockCls.get(msg0.getClass()); + + if (F.contains(blockNodes, node.id())) { + log.info("Block message [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) + + ", msg=" + msg0 + ']'); + + blockedMsgs.add(new T2<>(node, (GridIoMessage)msg)); + + return; + } + } + } + + super.sendMessage(node, msg, ackClosure); + } + + /** + * @param recordCls Message class to record. + */ + void record(@Nullable Class<?> recordCls) { + synchronized (this) { + this.recordCls = recordCls; + } + } + + /** + * @return Recorded messages. + */ + List<Object> recordedMessages() { + synchronized (this) { + List<Object> msgs = recordedMsgs; + + recordedMsgs = new ArrayList<>(); + + return msgs; + } + } + + /** + * @param cls Message class. + * @param nodeId Node ID. + */ + void blockMessages(Class<?> cls, UUID nodeId) { + synchronized (this) { + Set<UUID> set = blockCls.get(cls); + + if (set == null) { + set = new HashSet<>(); + + blockCls.put(cls, set); + } + + set.add(nodeId); + } + } + + /** + * + */ + void stopBlock() { + synchronized (this) { + blockCls.clear(); + + for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) { + ClusterNode node = msg.get1(); + + log.info("Send blocked message: [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) + + ", msg=" + msg.get2().message() + ']'); + + sendMessage(msg.get1(), msg.get2()); + } + + blockedMsgs.clear(); + } + } + } + + private static class TestValue { + /** Field1. */ + private String field1; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridReplicatedTxPreloadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridReplicatedTxPreloadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridReplicatedTxPreloadTest.java index e7560c7..126ce61 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridReplicatedTxPreloadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridReplicatedTxPreloadTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.replicated; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.processors.cache.distributed.IgniteTxPreloadAbstractTest; import static org.apache.ignite.cache.CacheMode.REPLICATED; @@ -32,6 +33,11 @@ public class GridReplicatedTxPreloadTest extends IgniteTxPreloadAbstractTest { } /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + return super.getConfiguration(gridName).setMarshaller(null); + } + + /** {@inheritDoc} */ @Override public void testLocalTxPreloadingOptimistic() throws Exception { fail("https://issues.apache.org/jira/browse/IGNITE-1755"); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6ef7fb60/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePortableObjectsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePortableObjectsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePortableObjectsTestSuite.java index 629835b..62952b5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePortableObjectsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePortableObjectsTestSuite.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.portable.noncompact.BinaryFooterOffsetsOffheap import org.apache.ignite.internal.portable.noncompact.BinaryMarshallerNonCompactSelfTest; import org.apache.ignite.internal.portable.noncompact.BinaryObjectBuilderAdditionalNonCompactSelfTest; import org.apache.ignite.internal.portable.noncompact.BinaryObjectBuilderNonCompactSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteBinaryMetadataUpdateChangingTopologySelfTest; import org.apache.ignite.internal.processors.cache.portable.GridCacheClientNodeBinaryObjectMetadataMultinodeTest; import org.apache.ignite.internal.processors.cache.portable.GridCacheClientNodeBinaryObjectMetadataTest; import org.apache.ignite.internal.processors.cache.portable.GridCachePortableStoreObjectsSelfTest; @@ -106,6 +107,7 @@ public class IgnitePortableObjectsTestSuite extends TestSuite { suite.addTestSuite(GridCacheClientNodeBinaryObjectMetadataTest.class); suite.addTestSuite(GridCacheClientNodeBinaryObjectMetadataMultinodeTest.class); + suite.addTestSuite(IgniteBinaryMetadataUpdateChangingTopologySelfTest.class); return suite; }