This is an automated email from the ASF dual-hosted git repository. agoncharuk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 30a3d23 IGNITE-11605 Recheck metadata difference after put pending future - Fixes #6324. 30a3d23 is described below commit 30a3d2361ad93e61b1036d2b3ff916ece13db904 Author: Anton Kalashnikov <kaa....@yandex.ru> AuthorDate: Wed Mar 27 17:29:04 2019 +0300 IGNITE-11605 Recheck metadata difference after put pending future - Fixes #6324. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> --- .../cache/binary/BinaryMetadataTransport.java | 97 +++++++++++++++++----- .../binary/CacheObjectBinaryProcessorImpl.java | 94 ++++++++++----------- .../cache/BinaryTypeRegistrationTest.java | 2 +- 3 files changed, 121 insertions(+), 72 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java index d2fe972..0d2f6f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.UUID; @@ -37,7 +38,6 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryMetadata; -import org.apache.ignite.internal.binary.BinaryUtils; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridMessageListener; @@ -53,6 +53,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.binary.BinaryUtils.mergeMetadata; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; /** @@ -163,27 +164,66 @@ final class BinaryMetadataTransport { /** * Sends request to cluster proposing update for given metadata. * - * @param metadata Metadata proposed for update. + * @param newMeta Metadata proposed for update. * @return Future to wait for update result on. */ - GridFutureAdapter<MetadataUpdateResult> requestMetadataUpdate(BinaryMetadata metadata) { - MetadataUpdateResultFuture resFut = new MetadataUpdateResultFuture(metadata.typeId()); + GridFutureAdapter<MetadataUpdateResult> requestMetadataUpdate(BinaryMetadata newMeta) { + int typeId = newMeta.typeId(); - if (log.isDebugEnabled()) - log.debug("Requesting metadata update for " + metadata.typeId() + "; caller thread is blocked on future " - + resFut); + MetadataUpdateResultFuture resFut; - MetadataUpdateResultFuture oldFut = pendingTypeIdMap.putIfAbsent(metadata.typeId(), resFut); + do { + BinaryMetadataHolder metaHolder = metaLocCache.get(typeId); + + BinaryMetadata oldMeta = Optional.ofNullable(metaHolder) + .map(BinaryMetadataHolder::metadata) + .orElse(null); + + BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta, null); + + if (mergedMeta == oldMeta) { + if (metaHolder.pendingVersion() == metaHolder.acceptedVersion()) + return null; + + return awaitMetadataUpdate(typeId, metaHolder.pendingVersion()); + } + + resFut = new MetadataUpdateResultFuture(typeId); + } + while (!putAndWaitPendingUpdate(typeId, resFut)); + + BinaryMetadataHolder metadataHolder = metaLocCache.get(typeId); + + BinaryMetadata oldMeta = Optional.ofNullable(metadataHolder) + .map(BinaryMetadataHolder::metadata) + .orElse(null); + + Set<Integer> changedSchemas = new LinkedHashSet<>(); + + //Ensure after putting pending future, metadata still has difference. + BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta, changedSchemas); + + if (mergedMeta == oldMeta) { + resFut.onDone(MetadataUpdateResult.createSuccessfulResult()); - if(oldFut != null) return null; + } + + if (log.isDebugEnabled()) { + log.debug("Requesting metadata update [typeId=" + typeId + + ", typeName=" + mergedMeta.typeName() + + ", changedSchemas=" + changedSchemas + + ", holder=" + metadataHolder + + ", fut=" + resFut + + ']'); + } try { synchronized (this) { unlabeledFutures.add(resFut); if (!stopping) - discoMgr.sendCustomEvent(new MetadataUpdateProposedMessage(metadata, ctx.localNodeId())); + discoMgr.sendCustomEvent(new MetadataUpdateProposedMessage(mergedMeta, ctx.localNodeId())); else resFut.onDone(MetadataUpdateResult.createUpdateDisabledResult()); } @@ -199,6 +239,31 @@ final class BinaryMetadataTransport { } /** + * Put new update future and it are waiting pending future if it exists. + * + * @param typeId Type id. + * @param metaUpdateFut New metadata update future. + * @return {@code true} If given future put successfully. + */ + private boolean putAndWaitPendingUpdate(int typeId, MetadataUpdateResultFuture metaUpdateFut) { + MetadataUpdateResultFuture oldFut = pendingTypeIdMap.putIfAbsent(typeId, metaUpdateFut); + + if (oldFut != null) { + try { + oldFut.get(); + } + catch (IgniteCheckedException ignore) { + //Stacktrace will be logged in thread which created this future. + log.warning("Pending update metadata process was failed. Trying to update to new metadata."); + } + + return false; + } + + return true; + } + + /** * Allows thread to wait for a metadata of given typeId and version to be accepted by the cluster. * * @param typeId ID of binary type. @@ -223,14 +288,6 @@ final class BinaryMetadataTransport { } /** - * @param typeId Type id. - * @return Pending meta update future. - */ - GridFutureAdapter<MetadataUpdateResult> getPendingMetaUpdate(int typeId) { - return pendingTypeIdMap.get(typeId); - } - - /** * Await specific schema update. * @param typeId Type id. * @param schemaId Schema id. @@ -331,7 +388,7 @@ final class BinaryMetadataTransport { try { Set<Integer> changedSchemas = new LinkedHashSet<>(); - BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata(), changedSchemas); + BinaryMetadata mergedMeta = mergeMetadata(locMeta, msg.metadata(), changedSchemas); if (log.isDebugEnabled()) log.debug("Versions are stamped on coordinator" + @@ -410,7 +467,7 @@ final class BinaryMetadataTransport { Set<Integer> changedSchemas = new LinkedHashSet<>(); try { - BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata(), changedSchemas); + BinaryMetadata mergedMeta = mergeMetadata(locMeta, msg.metadata(), changedSchemas); BinaryMetadataHolder newHolder = new BinaryMetadataHolder(mergedMeta, pendingVer, acceptedVer); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 3dc4b45..4cce520 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.binary; +import javax.cache.CacheException; import java.io.File; import java.io.Serializable; import java.math.BigDecimal; @@ -26,14 +27,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import javax.cache.CacheException; import org.apache.ignite.IgniteBinary; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; @@ -121,6 +119,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAIT_SCHEMA_UPDATE; import static org.apache.ignite.IgniteSystemProperties.getBoolean; import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.BINARY_PROC; +import static org.apache.ignite.internal.binary.BinaryUtils.mergeMetadata; /** * Binary processor implementation. @@ -223,9 +222,7 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme BinaryMetadata oldMeta = holder != null ? holder.metadata() : null; - BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata( - oldMeta, ((BinaryTypeImpl)newMeta).metadata() - ); + BinaryMetadata mergedMeta = mergeMetadata(oldMeta, ((BinaryTypeImpl)newMeta).metadata()); if (oldMeta != mergedMeta) metadataLocCache.put(typeId, new BinaryMetadataHolder(mergedMeta, 0, 0)); @@ -536,52 +533,23 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme BinaryMetadata newMeta0 = ((BinaryTypeImpl)newMeta).metadata(); - try { - GridFutureAdapter<MetadataUpdateResult> fut; - Set<Integer> changedSchemas; - BinaryMetadataHolder metaHolder; - - do { - metaHolder = metadataLocCache.get(typeId); - - BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null; - - changedSchemas = new LinkedHashSet<>(); - - BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0, changedSchemas); - - if (mergedMeta != oldMeta) { - if (failIfUnregistered) - throw new UnregisteredBinaryTypeException(typeId, mergedMeta); + if (failIfUnregistered) { + failIfUnregistered(typeId, newMeta0); - fut = transport.requestMetadataUpdate(mergedMeta); - } - else { - if (metaHolder.pendingVersion() == metaHolder.acceptedVersion()) - return; + return; + } - // Metadata locally is up-to-date. Waiting for updating metadata in an entire cluster, if necessary. - fut = transport.awaitMetadataUpdate(typeId, metaHolder.pendingVersion()); + try { + GridFutureAdapter<MetadataUpdateResult> fut = transport.requestMetadataUpdate(newMeta0); - if (failIfUnregistered && !fut.isDone()) - throw new UnregisteredBinaryTypeException(typeId, fut); + if (fut == null) { + if (log.isDebugEnabled()) { + log.debug("Metadata update was skipped [typeId=" + typeId + + ", typeName=" + newMeta.typeName() + ']'); } - if (fut == null) { - GridFutureAdapter<MetadataUpdateResult> pending = transport.getPendingMetaUpdate(typeId); - - if (pending != null) { - try { - pending.get(); - } - catch (IgniteCheckedException ignore) { - //Stacktrace will be logged in thread which created this future. - log.warning("Pending update metadata process was failed. Trying to update to new metadata."); - } - } - } + return; } - while (fut == null); long t0 = System.nanoTime(); @@ -592,9 +560,7 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme log.debug("Completed metadata update [typeId=" + typeId + ", typeName=" + newMeta.typeName() + - ", changedSchemas=" + changedSchemas + ", waitTime=" + MILLISECONDS.convert(System.nanoTime() - t0, NANOSECONDS) + "ms" + - ", holder=" + metaHolder + ", fut=" + fut + ", tx=" + CU.txString(tx) + ']'); @@ -618,6 +584,32 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme } } + /** + * Throw specific exception if given binary metadata is unregistered. + * + * @param typeId Type id. + * @param newMeta0 Expected binary metadata. + */ + private void failIfUnregistered(int typeId, BinaryMetadata newMeta0) { + BinaryMetadataHolder metaHolder = metadataLocCache.get(typeId); + + BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null; + + BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta0); + + if (mergedMeta != oldMeta) + throw new UnregisteredBinaryTypeException(typeId, mergedMeta); + + if (metaHolder.pendingVersion() == metaHolder.acceptedVersion()) + return; + + // Metadata locally is up-to-date. Waiting for updating metadata in an entire cluster, if necessary. + GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate(typeId, metaHolder.pendingVersion()); + + if (!fut.isDone()) + throw new UnregisteredBinaryTypeException(typeId, fut); + } + /** {@inheritDoc} */ @Override public void addMetaLocally(int typeId, BinaryType newMeta) throws BinaryObjectException { assert newMeta != null; @@ -630,7 +622,7 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null; try { - BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0); + BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta0); if (!ctx.clientNode()) metadataFileStore.mergeAndWriteMetadata(mergedMeta); @@ -1302,7 +1294,7 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme continue; try { - BinaryUtils.mergeMetadata(locMeta, rmtMeta); + mergeMetadata(locMeta, rmtMeta); } catch (Exception e) { String locMsg = "Exception was thrown when merging binary metadata from node %s: %s"; @@ -1361,7 +1353,7 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme BinaryMetadata newMeta = metaEntry.getValue().metadata(); BinaryMetadata localMeta = localMetaHolder.metadata(); - BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(localMeta, newMeta); + BinaryMetadata mergedMeta = mergeMetadata(localMeta, newMeta); if (mergedMeta != localMeta) { //put mergedMeta to local cache and store to disk diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java index e031e4f..b49f395 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java @@ -103,7 +103,7 @@ public class BinaryTypeRegistrationTest extends GridCommonAbstractTest { exec.shutdown(); exec.awaitTermination(10, TimeUnit.SECONDS); - assertEquals(1, metadataUpdateProposedMessages.size()); + assertEquals(metadataUpdateProposedMessages.toString(), 1, metadataUpdateProposedMessages.size()); } /**