http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java new file mode 100644 index 0000000..3e51ff3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.util.typedef.internal.GPC; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Class that serves asynchronous partition eviction process. + */ +public class GridDhtPartitionsEvictor { + /** Show eviction progress frequency in ms. */ + private static final int SHOW_EVICTION_PROGRESS_FREQ_MS = 2 * 60 * 1000; // 2 Minutes. + + /** */ + private final GridCacheSharedContext<?, ?> ctx; + + /** */ + private final CacheGroupContext grp; + + /** */ + private final IgniteLogger log; + + /** Queue contains partitions scheduled for eviction. */ + private final ConcurrentHashMap<Integer, GridDhtLocalPartition> evictionQueue = new ConcurrentHashMap<>(); + + /** Flag indicates that eviction process is running at the moment, false in other case. */ + private final AtomicBoolean evictionRunning = new AtomicBoolean(); + + /** + * Constructor. + * + * @param grp Cache group context. + */ + public GridDhtPartitionsEvictor(CacheGroupContext grp) { + assert grp != null; + + this.grp = grp; + this.ctx = grp.shared(); + + this.log = ctx.logger(getClass()); + } + + /** + * Adds partition to eviction queue and starts eviction process. + * + * @param part Partition to evict. + */ + public void evictPartitionAsync(GridDhtLocalPartition part) { + evictionQueue.putIfAbsent(part.id(), part); + + if (evictionRunning.compareAndSet(false, true)) { + ctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() { + @Override public Boolean call() { + boolean locked = true; + + long nextShowProgressTime = U.currentTimeMillis() + SHOW_EVICTION_PROGRESS_FREQ_MS; + + while (locked || !evictionQueue.isEmpty()) { + if (!locked && !evictionRunning.compareAndSet(false, true)) + return false; + + try { + for (GridDhtLocalPartition part : evictionQueue.values()) { + // Show progress of currently evicting partitions. + if (U.currentTimeMillis() >= nextShowProgressTime) { + if (log.isInfoEnabled()) + log.info("Eviction in progress [grp=" + grp.cacheOrGroupName() + + ", remainingCnt=" + evictionQueue.size() + "]"); + + nextShowProgressTime = U.currentTimeMillis() + SHOW_EVICTION_PROGRESS_FREQ_MS; + } + + try { + boolean success = part.tryClear(); + + if (success) + evictionQueue.remove(part.id()); + } + catch (Throwable ex) { + if (ctx.kernalContext().isStopping()) { + LT.warn(log, ex, "Partition eviction failed (current node is stopping).", + false, + true); + + evictionQueue.clear(); + + return true; + } + else + LT.error(log, ex, "Partition eviction failed, this can cause grid hang."); + } + } + } + finally { + if (!evictionQueue.isEmpty()) { + if (ctx.kernalContext().isStopping()) { + evictionQueue.clear(); + + locked = false; + } + else + locked = true; + } + else { + boolean res = evictionRunning.compareAndSet(true, false); + + assert res; + + locked = false; + } + } + } + + return true; + } + }, /*system pool*/ true); + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java index de58188..97bd16b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java @@ -189,7 +189,7 @@ public class GridDhtPartitionsReservation implements GridReservable { */ private static void tryEvict(GridDhtLocalPartition part) { if (part.state() == RENTING && part.reservations() == 0) - part.tryEvictAsync(true); + part.tryContinueClearing(); } /** @@ -227,8 +227,6 @@ public class GridDhtPartitionsReservation implements GridReservable { GridDhtLocalPartition part = arr[i]; part.removeReservation(this); - - tryEvict(part); } } @@ -240,7 +238,7 @@ public class GridDhtPartitionsReservation implements GridReservable { } /** - * Must be checked in {@link GridDhtLocalPartition#tryEvict()}. + * Must be checked in {@link GridDhtLocalPartition#tryClear()}. * If returns {@code true} this reservation object becomes invalid and partitions * can be evicted or at least cleared. * Also this means that after returning {@code true} here method {@link #reserve()} can not http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java index ebc993c..2d5eec3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.io.Serializable; import java.util.Arrays; import java.util.Map; +import java.util.Set; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; @@ -93,6 +94,27 @@ public class CachePartitionFullCountersMap implements Serializable { } /** + * Creates submap for provided partition IDs. + * + * @param parts Partition IDs. + * @return Partial counters map. + */ + public CachePartitionPartialCountersMap subMap(Set<Integer> parts) { + CachePartitionPartialCountersMap res = new CachePartitionPartialCountersMap(parts.size()); + + for (int p = 0; p < updCntrs.length; p++) { + if (!parts.contains(p)) + continue; + + res.add(p, initialUpdCntrs[p], updCntrs[p]); + } + + assert res.size() == parts.size(); + + return res; + } + + /** * Clears full counters map. */ public void clear() { http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java index c0de7cf..c8cf3f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java @@ -73,6 +73,13 @@ public class CachePartitionPartialCountersMap implements Serializable { } /** + * @return {@code True} if map is empty. + */ + public boolean isEmpty() { + return curIdx == 0; + } + + /** * Adds partition counters for a partition with the given ID. * * @param partId Partition ID to add. @@ -97,11 +104,38 @@ public class CachePartitionPartialCountersMap implements Serializable { } /** + * Removes element. + * + * @param partId Partition ID. + * @return {@code True} if element was actually removed. + */ + public boolean remove(int partId) { + int removedIdx = partitionIndex(partId); + + if (removedIdx < 0) + return false; + + int lastIdx = --curIdx; + + for (int i = removedIdx; i < lastIdx; i++) { + partIds[i] = partIds[i + 1]; + initialUpdCntrs[i] = initialUpdCntrs[i + 1]; + updCntrs[i] = updCntrs[i + 1]; + } + + partIds[lastIdx] = 0; + initialUpdCntrs[lastIdx] = 0; + updCntrs[lastIdx] = 0; + + return true; + } + + /** * Cuts the array sizes according to curIdx. No more entries can be added to this map * after this method is called. */ public void trim() { - if (curIdx < partIds.length) { + if (partIds != null && curIdx < partIds.length) { partIds = Arrays.copyOf(partIds, curIdx); initialUpdCntrs = Arrays.copyOf(initialUpdCntrs, curIdx); updCntrs = Arrays.copyOf(updCntrs, curIdx); @@ -117,6 +151,14 @@ public class CachePartitionPartialCountersMap implements Serializable { } /** + * @param partId Partition ID. + * @return {@code True} if partition is present in map. + */ + public boolean contains(int partId) { + return partitionIndex(partId) >= 0; + } + + /** * Gets partition ID saved at the given index. * * @param idx Index to get value from. http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandLegacyMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandLegacyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandLegacyMessage.java new file mode 100644 index 0000000..a71aabf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandLegacyMessage.java @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.io.Externalizable; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.GridDirectMap; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.NotNull; + +/** + * Partition demand request. + */ +public class GridDhtPartitionDemandLegacyMessage extends GridCacheGroupIdMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Update sequence. */ + private long updateSeq; + + /** Partition. */ + @GridDirectCollection(int.class) + private Collection<Integer> parts; + + /** Partitions that must be restored from history. */ + @GridDirectCollection(int.class) + private Collection<Integer> historicalParts; + + /** Partition counters. */ + @GridDirectMap(keyType = int.class, valueType = long.class) + private Map<Integer, Long> partsCntrs; + + /** Topic. */ + @GridDirectTransient + private Object topic; + + /** Serialized topic. */ + private byte[] topicBytes; + + /** Timeout. */ + private long timeout; + + /** Worker ID. */ + private int workerId = -1; + + /** Topology version. */ + private AffinityTopologyVersion topVer; + + /** + * @param updateSeq Update sequence for this node. + * @param topVer Topology version. + * @param grpId Cache group ID. + */ + GridDhtPartitionDemandLegacyMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int grpId) { + this.grpId = grpId; + this.updateSeq = updateSeq; + this.topVer = topVer; + } + + /** + * @param cp Message to copy from. + * @param parts Partitions. + */ + GridDhtPartitionDemandLegacyMessage(GridDhtPartitionDemandLegacyMessage cp, Collection<Integer> parts, + Map<Integer, Long> partsCntrs) { + grpId = cp.grpId; + updateSeq = cp.updateSeq; + topic = cp.topic; + timeout = cp.timeout; + workerId = cp.workerId; + topVer = cp.topVer; + + // Create a copy of passed in collection since it can be modified when this message is being sent. + this.parts = new HashSet<>(parts); + this.partsCntrs = partsCntrs; + + if (cp.historicalParts != null) + historicalParts = new HashSet<>(cp.historicalParts); + } + + GridDhtPartitionDemandLegacyMessage(GridDhtPartitionDemandMessage cp) { + grpId = cp.groupId(); + updateSeq = cp.rebalanceId() < 0 ? -1 : cp.rebalanceId(); + topic = cp.topic(); + timeout = cp.timeout(); + workerId = cp.workerId(); + topVer = cp.topologyVersion(); + + parts = new HashSet<>(cp.partitions().size()); + + parts.addAll(cp.partitions().fullSet()); + + CachePartitionPartialCountersMap histMap = cp.partitions().historicalMap(); + + if (!histMap.isEmpty()) { + historicalParts = new HashSet<>(histMap.size()); + + for (int i = 0; i < histMap.size(); i++) { + int p = histMap.partitionAt(i); + + parts.add(p); + historicalParts.add(p); + partsCntrs.put(p, histMap.updateCounterAt(i)); + } + } + } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridDhtPartitionDemandLegacyMessage() { + // No-op. + } + + /** + * @param p Partition. + */ + void addPartition(int p, boolean historical) { + if (parts == null) + parts = new HashSet<>(); + + parts.add(p); + + if (historical) { + if (historicalParts == null) + historicalParts = new HashSet<>(); + + historicalParts.add(p); + } + } + + /** + * @return Partition. + */ + Collection<Integer> partitions() { + return parts; + } + + /** + * @param p Partition to check. + * @return {@code True} if historical. + */ + boolean isHistorical(int p) { + if (historicalParts == null) + return false; + + return historicalParts.contains(p); + } + + /** + * @param updateSeq Update sequence. + */ + void updateSequence(long updateSeq) { + this.updateSeq = updateSeq; + } + + /** + * @return Update sequence. + */ + long updateSequence() { + return updateSeq; + } + + /** + * @return Reply message timeout. + */ + long timeout() { + return timeout; + } + + /** + * @param timeout Timeout. + */ + void timeout(long timeout) { + this.timeout = timeout; + } + + /** + * @return Topic. + */ + Object topic() { + return topic; + } + + /** + * @param topic Topic. + */ + void topic(Object topic) { + this.topic = topic; + } + + /** + * @return Worker ID. + */ + int workerId() { + return workerId; + } + + /** + * @param workerId Worker ID. + */ + void workerId(int workerId) { + this.workerId = workerId; + } + + /** + * @param part Partition to get counter for. + * @return Partition counter associated with this partition or {@code null} if this information is unavailable. + */ + Long partitionCounter(int part) { + return partsCntrs == null ? null : partsCntrs.get(part); + } + + /** + * @return Topology version for which demand message is sent. + */ + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (topic != null && topicBytes == null) + topicBytes = U.marshal(ctx, topic); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (topicBytes != null && topic == null) + topic = U.unmarshal(ctx, topicBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeCollection("historicalParts", historicalParts, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeCollection("parts", parts, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeMap("partsCntrs", partsCntrs, MessageCollectionItemType.INT, MessageCollectionItemType.LONG)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeLong("timeout", timeout)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeByteArray("topicBytes", topicBytes)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeLong("updateSeq", updateSeq)) + return false; + + writer.incrementState(); + + case 10: + if (!writer.writeInt("workerId", workerId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + historicalParts = reader.readCollection("historicalParts", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + parts = reader.readCollection("parts", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + partsCntrs = reader.readMap("partsCntrs", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + timeout = reader.readLong("timeout"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + topicBytes = reader.readByteArray("topicBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + updateSeq = reader.readLong("updateSeq"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: + workerId = reader.readInt("workerId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridDhtPartitionDemandMessage.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 44; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 11; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtPartitionDemandLegacyMessage.class, this, + "partCnt", parts != null ? parts.size() : 0, + "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java index 4a693bf..4fba917 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java @@ -19,19 +19,15 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.io.Externalizable; import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.GridDirectCollection; -import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; @@ -43,20 +39,18 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage { /** */ private static final long serialVersionUID = 0L; - /** Update sequence. */ - private long updateSeq; + /** */ + public static final IgniteProductVersion VERSION_SINCE = IgniteProductVersion.fromString("2.4.0"); - /** Partition. */ - @GridDirectCollection(int.class) - private Collection<Integer> parts; + /** Rebalance id. */ + private long rebalanceId; - /** Partitions that must be restored from history. */ - @GridDirectCollection(int.class) - private Collection<Integer> historicalParts; + /** Partitions map. */ + @GridDirectTransient + private IgniteDhtDemandedPartitionsMap parts; - /** Partition counters. */ - @GridDirectMap(keyType = int.class, valueType = long.class) - private Map<Integer, Long> partsCntrs; + /** Serialized partitions map. */ + private byte[] partsBytes; /** Topic. */ @GridDirectTransient @@ -75,35 +69,43 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage { private AffinityTopologyVersion topVer; /** - * @param updateSeq Update sequence for this node. + * @param rebalanceId Rebalance id for this node. * @param topVer Topology version. * @param grpId Cache group ID. */ - GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int grpId) { + GridDhtPartitionDemandMessage(long rebalanceId, @NotNull AffinityTopologyVersion topVer, int grpId) { this.grpId = grpId; - this.updateSeq = updateSeq; + this.rebalanceId = rebalanceId; this.topVer = topVer; + + parts = new IgniteDhtDemandedPartitionsMap(); } /** * @param cp Message to copy from. - * @param parts Partitions. */ - GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, Collection<Integer> parts, - Map<Integer, Long> partsCntrs) { - grpId = cp.grpId; - updateSeq = cp.updateSeq; - topic = cp.topic; - timeout = cp.timeout; - workerId = cp.workerId; - topVer = cp.topVer; - - // Create a copy of passed in collection since it can be modified when this message is being sent. - this.parts = new HashSet<>(parts); - this.partsCntrs = partsCntrs; - - if (cp.historicalParts != null) - historicalParts = new HashSet<>(cp.historicalParts); + public GridDhtPartitionDemandMessage(GridDhtPartitionDemandLegacyMessage cp) { + grpId = cp.groupId(); + rebalanceId = cp.updateSequence(); + topic = cp.topic(); + timeout = cp.timeout(); + workerId = cp.workerId(); + topVer = cp.topologyVersion(); + + IgniteDhtDemandedPartitionsMap partMap = new IgniteDhtDemandedPartitionsMap(); + + if (cp.partitions() != null) { + for (Integer p : cp.partitions()) { + if (cp.isHistorical(p)) + partMap.addHistorical(p, 0, cp.partitionCounter(p), cp.partitions().size()); + else + partMap.addFull(p); + } + } + + partMap.historicalMap().trim(); + + parts = partMap; } /** @@ -114,52 +116,42 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage { } /** - * @param p Partition. + * Creates copy of this message with new partitions map. + * + * @param parts New partitions map. + * @return Copy of message with new partitions map. */ - void addPartition(int p, boolean historical) { - if (parts == null) - parts = new HashSet<>(); - - parts.add(p); - - if (historical) { - if (historicalParts == null) - historicalParts = new HashSet<>(); - - historicalParts.add(p); - } + public GridDhtPartitionDemandMessage withNewPartitionsMap(@NotNull IgniteDhtDemandedPartitionsMap parts) { + GridDhtPartitionDemandMessage cp = new GridDhtPartitionDemandMessage(); + cp.grpId = grpId; + cp.rebalanceId = rebalanceId; + cp.topic = topic; + cp.timeout = timeout; + cp.workerId = workerId; + cp.topVer = topVer; + cp.parts = parts; + return cp; } /** * @return Partition. */ - Collection<Integer> partitions() { + IgniteDhtDemandedPartitionsMap partitions() { return parts; } /** - * @param p Partition to check. - * @return {@code True} if historical. - */ - boolean isHistorical(int p) { - if (historicalParts == null) - return false; - - return historicalParts.contains(p); - } - - /** * @param updateSeq Update sequence. */ - void updateSequence(long updateSeq) { - this.updateSeq = updateSeq; + void rebalanceId(long updateSeq) { + this.rebalanceId = updateSeq; } /** - * @return Update sequence. + * @return Unique rebalance session id. */ - long updateSequence() { - return updateSeq; + long rebalanceId() { + return rebalanceId; } /** @@ -205,18 +197,23 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage { } /** - * @param part Partition to get counter for. - * @return Partition counter associated with this partition or {@code null} if this information is unavailable. + * @return Topology version for which demand message is sent. */ - Long partitionCounter(int part) { - return partsCntrs == null ? null : partsCntrs.get(part); + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; } /** - * @return Topology version for which demand message is sent. + * Converts message to it's legacy version if necessary. + * + * @param target Target version + * @return Converted message or {@code this} if conversion isn't necessary. */ - @Override public AffinityTopologyVersion topologyVersion() { - return topVer; + public GridCacheMessage convertIfNeeded(IgniteProductVersion target) { + if (target.compareTo(VERSION_SINCE) <= 0) + return new GridDhtPartitionDemandLegacyMessage(this); + + return this; } /** {@inheritDoc} @@ -226,6 +223,9 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage { if (topic != null && topicBytes == null) topicBytes = U.marshal(ctx, topic); + + if (parts != null && partsBytes == null) + partsBytes = U.marshal(ctx, parts); } /** {@inheritDoc} */ @@ -234,6 +234,9 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage { if (topicBytes != null && topic == null) topic = U.unmarshal(ctx, topicBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + + if (partsBytes != null && parts == null) + parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } /** {@inheritDoc} */ @@ -257,48 +260,36 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage { switch (writer.state()) { case 3: - if (!writer.writeCollection("historicalParts", historicalParts, MessageCollectionItemType.INT)) + if (!writer.writeByteArray("partsBytes", partsBytes)) return false; writer.incrementState(); case 4: - if (!writer.writeCollection("parts", parts, MessageCollectionItemType.INT)) + if (!writer.writeLong("timeout", timeout)) return false; writer.incrementState(); case 5: - if (!writer.writeMap("partsCntrs", partsCntrs, MessageCollectionItemType.INT, MessageCollectionItemType.LONG)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 6: - if (!writer.writeLong("timeout", timeout)) + if (!writer.writeByteArray("topicBytes", topicBytes)) return false; writer.incrementState(); case 7: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeLong("rebalanceId", rebalanceId)) return false; writer.incrementState(); case 8: - if (!writer.writeByteArray("topicBytes", topicBytes)) - return false; - - writer.incrementState(); - - case 9: - if (!writer.writeLong("updateSeq", updateSeq)) - return false; - - writer.incrementState(); - - case 10: if (!writer.writeInt("workerId", workerId)) return false; @@ -321,7 +312,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage { switch (reader.state()) { case 3: - historicalParts = reader.readCollection("historicalParts", MessageCollectionItemType.INT); + partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) return false; @@ -329,7 +320,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage { reader.incrementState(); case 4: - parts = reader.readCollection("parts", MessageCollectionItemType.INT); + timeout = reader.readLong("timeout"); if (!reader.isLastRead()) return false; @@ -337,7 +328,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage { reader.incrementState(); case 5: - partsCntrs = reader.readMap("partsCntrs", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false); + topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) return false; @@ -345,7 +336,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage { reader.incrementState(); case 6: - timeout = reader.readLong("timeout"); + topicBytes = reader.readByteArray("topicBytes"); if (!reader.isLastRead()) return false; @@ -353,7 +344,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage { reader.incrementState(); case 7: - topVer = reader.readMessage("topVer"); + rebalanceId = reader.readLong("rebalanceId"); if (!reader.isLastRead()) return false; @@ -361,22 +352,6 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage { reader.incrementState(); case 8: - topicBytes = reader.readByteArray("topicBytes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 9: - updateSeq = reader.readLong("updateSeq"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 10: workerId = reader.readInt("workerId"); if (!reader.isLastRead()) @@ -391,12 +366,12 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage { /** {@inheritDoc} */ @Override public short directType() { - return 44; + return 45; } /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 11; + return 9; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6dc5804a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index d09312f..734bbaa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -27,12 +26,14 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -251,41 +252,40 @@ public class GridDhtPartitionDemander { } /** - * @param assigns Assignments. + * Initiates new rebalance process from given {@code assignments}. + * If previous rebalance is not finished method cancels it. + * In case of delayed rebalance method schedules new with configured delay. + * + * @param assignments Assignments. * @param force {@code True} if dummy reassign. - * @param cnt Counter. + * @param rebalanceId Rebalance id. * @param next Runnable responsible for cache rebalancing start. * @param forcedRebFut External future for forced rebalance. * @return Rebalancing runnable. */ Runnable addAssignments( - final GridDhtPreloaderAssignments assigns, + final GridDhtPreloaderAssignments assignments, boolean force, - int cnt, + long rebalanceId, final Runnable next, @Nullable final GridCompoundFuture<Boolean, Boolean> forcedRebFut ) { if (log.isDebugEnabled()) - log.debug("Adding partition assignments: " + assigns); + log.debug("Adding partition assignments: " + assignments); assert force == (forcedRebFut != null); long delay = grp.config().getRebalanceDelay(); - if ((delay == 0 || force) && assigns != null) { + if ((delay == 0 || force) && assignments != null) { final RebalanceFuture oldFut = rebalanceFut; - final RebalanceFuture fut = new RebalanceFuture(grp, assigns, log, cnt); + final RebalanceFuture fut = new RebalanceFuture(grp, assignments, log, rebalanceId); if (!oldFut.isInitial()) oldFut.cancel(); - else { - fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> fut) { - oldFut.onDone(fut.result()); - } - }); - } + else + fut.listen(f -> oldFut.onDone(f.result())); if (forcedRebFut != null) forcedRebFut.add(fut); @@ -300,17 +300,13 @@ public class GridDhtPartitionDemander { metrics.startRebalance(0); - rebalanceFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> fut) { - metrics.clearRebalanceCounters(); - } - }); + rebalanceFut.listen(f -> metrics.clearRebalanceCounters()); } } fut.sendRebalanceStartedEvent(); - if (assigns.cancelled()) { // Pending exchange. + if (assignments.cancelled()) { // Pending exchange. if (log.isDebugEnabled()) log.debug("Rebalancing skipped due to cancelled assignments."); @@ -321,7 +317,7 @@ public class GridDhtPartitionDemander { return null; } - if (assigns.isEmpty()) { // Nothing to rebalance. + if (assignments.isEmpty()) { // Nothing to rebalance. if (log.isDebugEnabled()) log.debug("Rebalancing skipped due to empty assignments."); @@ -334,24 +330,20 @@ public class GridDhtPartitionDemander { return null; } - return new Runnable() { - @Override public void run() { - if (next != null) - fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> f) { - try { - if (f.get()) // Not cancelled. - next.run(); // Starts next cache rebalancing (according to the order). - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug(e.getMessage()); - } - } - }); + return () -> { + if (next != null) + fut.listen(f -> { + try { + if (f.get()) // Not cancelled. + next.run(); // Starts next cache rebalancing (according to the order). + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug(e.getMessage()); + } + }); - requestPartitions(fut, assigns); - } + requestPartitions(fut, assignments); }; } else if (delay > 0) { @@ -391,10 +383,20 @@ public class GridDhtPartitionDemander { } /** + * Asynchronously sends initial demand messages formed from {@code assignments} and initiates supply-demand processes. + * + * For each node participating in rebalance process method distributes set of partitions for that node to several stripes (topics). + * It means that each stripe containing a subset of partitions can be processed in parallel. + * The number of stripes are controlled by {@link IgniteConfiguration#getRebalanceThreadPoolSize()} property. + * + * Partitions that can be rebalanced using only WAL are called historical, others are called full. + * + * Before sending messages, method awaits partitions clearing for full partitions. + * * @param fut Rebalance future. - * @param assigns Assignments. + * @param assignments Assignments. */ - private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssignments assigns) { + private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssignments assignments) { assert fut != null; if (topologyChanged(fut)) { @@ -418,10 +420,10 @@ public class GridDhtPartitionDemander { // Must add all remaining node before send first request, for avoid race between add remaining node and // processing response, see checkIsDone(boolean). - for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) { + for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assignments.entrySet()) { UUID nodeId = e.getKey().id(); - Collection<Integer> parts= e.getValue().partitions(); + IgniteDhtDemandedPartitionsMap parts = e.getValue().partitions(); assert parts != null : "Partitions are null [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId + "]"; @@ -431,193 +433,217 @@ public class GridDhtPartitionDemander { final CacheConfiguration cfg = grp.config(); - int lsnrCnt = ctx.gridConfig().getRebalanceThreadPoolSize(); - - for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) { + for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assignments.entrySet()) { final ClusterNode node = e.getKey(); GridDhtPartitionDemandMessage d = e.getValue(); - final Collection<Integer> parts = d.partitions(); - - U.log(log, "Starting rebalancing [mode=" + cfg.getRebalanceMode() + - ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() + - ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]"); + final IgniteDhtDemandedPartitionsMap parts; + synchronized (fut) { // Synchronized to prevent consistency issues in case of parallel cancellation. + if (fut.isDone()) + break; - final List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt); + parts = fut.remaining.get(node.id()).get2(); - for (int cnt = 0; cnt < lsnrCnt; cnt++) - sParts.add(new HashSet<Integer>()); + U.log(log, "Starting rebalancing [grp=" + grp.cacheOrGroupName() + + ", mode=" + cfg.getRebalanceMode() + ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() + + ", topology=" + fut.topologyVersion() + ", rebalanceId=" + fut.rebalanceId + "]"); + } - Iterator<Integer> it = parts.iterator(); + int stripes = ctx.gridConfig().getRebalanceThreadPoolSize(); - int cnt = 0; + final List<IgniteDhtDemandedPartitionsMap> stripePartitions = new ArrayList<>(stripes); + for (int i = 0; i < stripes; i++) + stripePartitions.add(new IgniteDhtDemandedPartitionsMap()); - while (it.hasNext()) - sParts.get(cnt++ % lsnrCnt).add(it.next()); + // Reserve one stripe for historical partitions. + if (parts.hasHistorical()) { + stripePartitions.add(stripes - 1, new IgniteDhtDemandedPartitionsMap(parts.historicalMap(), null)); - for (cnt = 0; cnt < lsnrCnt; cnt++) { - if (!sParts.get(cnt).isEmpty()) { - // Create copy. - final GridDhtPartitionDemandMessage initD = createDemandMessage(d, sParts.get(cnt)); + if (stripes > 1) + stripes--; + } - initD.topic(rebalanceTopics.get(cnt)); - initD.updateSequence(fut.updateSeq); - initD.timeout(cfg.getRebalanceTimeout()); + // Distribute full partitions across other stripes. + Iterator<Integer> it = parts.fullSet().iterator(); + for (int i = 0; it.hasNext(); i++) + stripePartitions.get(i % stripes).addFull(it.next()); - final int finalCnt = cnt; + for (int stripe = 0; stripe < stripes; stripe++) { + if (!stripePartitions.get(stripe).isEmpty()) { + // Create copy of demand message with new striped partitions map. + final GridDhtPartitionDemandMessage demandMsg = d.withNewPartitionsMap(stripePartitions.get(stripe)); - ctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - try { - if (!fut.isDone()) { - ctx.io().sendOrderedMessage(node, rebalanceTopics.get(finalCnt), initD, grp.ioPolicy(), initD.timeout()); + demandMsg.topic(rebalanceTopics.get(stripe)); + demandMsg.rebalanceId(fut.rebalanceId); + demandMsg.timeout(cfg.getRebalanceTimeout()); - // Cleanup required in case partitions demanded in parallel with cancellation. - synchronized (fut) { - if (fut.isDone()) - fut.cleanupRemoteContexts(node.id()); - } + final int topicId = stripe; - if (log.isDebugEnabled()) - log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + - finalCnt + ", partitions count=" + sParts.get(finalCnt).size() + - " (" + partitionsList(sParts.get(finalCnt)) + ")]"); + Runnable initDemandRequestTask = () -> { + try { + if (!fut.isDone()) { + ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId), + demandMsg.convertIfNeeded(node.version()), grp.ioPolicy(), demandMsg.timeout()); + + // Cleanup required in case partitions demanded in parallel with cancellation. + synchronized (fut) { + if (fut.isDone()) + fut.cleanupRemoteContexts(node.id()); } - } - catch (IgniteCheckedException e) { - ClusterTopologyCheckedException cause = e.getCause(ClusterTopologyCheckedException.class); - if (cause != null) - log.warning("Failed to send initial demand request to node. " + e.getMessage()); - else - log.error("Failed to send initial demand request to node.", e); - - fut.cancel(); - } - catch (Throwable th) { - log.error("Runtime error caught during initial demand request sending.", th); - - fut.cancel(); + if (log.isDebugEnabled()) + log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + + topicId + ", partitions count=" + stripePartitions.get(topicId).size() + + " (" + stripePartitions.get(topicId).partitionsList() + ")]"); } } - }, /*system pool*/true); - } - } - } - } + catch (IgniteCheckedException e1) { + ClusterTopologyCheckedException cause = e1.getCause(ClusterTopologyCheckedException.class); - /** - * @param old Old message. - * @param parts Partitions to demand. - * @return New demand message. - */ - private GridDhtPartitionDemandMessage createDemandMessage(GridDhtPartitionDemandMessage old, - Collection<Integer> parts) { - Map<Integer, Long> partCntrs = null; + if (cause != null) + log.warning("Failed to send initial demand request to node. " + e1.getMessage()); + else + log.error("Failed to send initial demand request to node.", e1); - for (Integer part : parts) { - try { - if (grp.persistenceEnabled()) { - if (partCntrs == null) - partCntrs = new HashMap<>(parts.size(), 1.0f); + fut.cancel(); + } + catch (Throwable th) { + log.error("Runtime error caught during initial demand request sending.", th); - GridDhtLocalPartition p = grp.topology().localPartition(part, old.topologyVersion(), false); + fut.cancel(); + } + }; - partCntrs.put(part, p.initialUpdateCounter()); + awaitClearingAndStartRebalance(fut, demandMsg, initDemandRequestTask); } } - catch (GridDhtInvalidPartitionException ignore) { - // Skip this partition. - } } - - return new GridDhtPartitionDemandMessage(old, parts, partCntrs); } /** - * @param c Partitions. - * @return String representation of partitions list. + * Awaits partitions clearing for full partitions and sends initial demand request + * after all partitions are cleared and safe to consume data. + * + * @param fut Rebalance future. + * @param demandMessage Initial demand message which contains set of full partitions to await. + * @param initDemandRequestTask Task which sends initial demand request. */ - private String partitionsList(Collection<Integer> c) { - List<Integer> s = new ArrayList<>(c); + private void awaitClearingAndStartRebalance(RebalanceFuture fut, + GridDhtPartitionDemandMessage demandMessage, + Runnable initDemandRequestTask) { + Set<Integer> fullPartitions = demandMessage.partitions().fullSet(); - Collections.sort(s); + if (fullPartitions.isEmpty()) { + ctx.kernalContext().closure().runLocalSafe(initDemandRequestTask, true); - StringBuilder sb = new StringBuilder(); + return; + } - int start = -1; + for (GridCacheContext cctx : grp.caches()) { + if (cctx.statisticsEnabled()) { + final CacheMetricsImpl metrics = cctx.cache().metrics0(); - int prev = -1; + metrics.rebalanceClearingPartitions(fullPartitions.size()); + } + } - Iterator<Integer> sit = s.iterator(); + final AtomicInteger clearingPartitions = new AtomicInteger(fullPartitions.size()); - while (sit.hasNext()) { - int p = sit.next(); + for (int partId : fullPartitions) { + if (fut.isDone()) + return; - if (start == -1) { - start = p; - prev = p; - } + GridDhtLocalPartition part = grp.topology().localPartition(partId); - if (prev < p - 1) { - sb.append(start); + if (part != null && part.state() == MOVING) { + part.onClearFinished(f -> { + // Cancel rebalance if partition clearing was failed. + if (f.error() != null) { + if (!fut.isDone()) { + for (GridCacheContext cctx : grp.caches()) { + if (cctx.statisticsEnabled()) { + final CacheMetricsImpl metrics = cctx.cache().metrics0(); - if (start != prev) - sb.append("-").append(prev); + metrics.rebalanceClearingPartitions(0); + } + } - sb.append(", "); + log.error("Unable to await partition clearing " + part, f.error()); - start = p; - } + fut.cancel(); + } + } + else { + if (!fut.isDone()) { + int existed = clearingPartitions.decrementAndGet(); - if (!sit.hasNext()) { - sb.append(start); + for (GridCacheContext cctx : grp.caches()) { + if (cctx.statisticsEnabled()) { + final CacheMetricsImpl metrics = cctx.cache().metrics0(); - if (start != p) - sb.append("-").append(p); - } + metrics.rebalanceClearingPartitions(existed); + } + } - prev = p; + // If all partitions are cleared send initial demand message. + if (existed == 0) + ctx.kernalContext().closure().runLocalSafe(initDemandRequestTask, true); + } + } + }); + } + else + clearingPartitions.decrementAndGet(); } - - return sb.toString(); } /** - * @param idx Index. - * @param id Node id. - * @param supply Supply. + * Handles supply message from {@code nodeId} with specified {@code topicId}. + * + * Supply message contains entries to populate rebalancing partitions. + * + * There is a cyclic process: + * Populate rebalancing partitions with entries from Supply message. + * If not all partitions specified in {@link #rebalanceFut} were rebalanced or marked as missed + * send new Demand message to request next batch of entries. + * + * @param topicId Topic id. + * @param nodeId Node id. + * @param supply Supply message. */ public void handleSupplyMessage( - int idx, - final UUID id, + int topicId, + final UUID nodeId, final GridDhtPartitionSupplyMessage supply ) { AffinityTopologyVersion topVer = supply.topologyVersion(); final RebalanceFuture fut = rebalanceFut; - ClusterNode node = ctx.node(id); + ClusterNode node = ctx.node(nodeId); if (node == null) return; - if (!fut.isActual(supply.updateSequence())) // Current future have another update sequence. - return; // Supple message based on another future. - if (topologyChanged(fut)) // Topology already changed (for the future that supply message based on). return; + if (!fut.isActual(supply.rebalanceId())) { + // Current future have another rebalance id. + // Supple message based on another future. + return; + } + if (log.isDebugEnabled()) log.debug("Received supply message [grp=" + grp.cacheOrGroupName() + ", msg=" + supply + ']'); // Check whether there were class loading errors on unmarshal if (supply.classError() != null) { - U.warn(log, "Rebalancing from node cancelled [grp=" + grp.cacheOrGroupName() + ", node=" + id + + U.warn(log, "Rebalancing from node cancelled [grp=" + grp.cacheOrGroupName() + ", node=" + nodeId + "]. Class got undeployed during preloading: " + supply.classError()); - fut.cancel(id); + fut.cancel(nodeId); return; } @@ -686,19 +712,16 @@ public class GridDhtPartitionDemander { if (grp.sharedGroup() && (cctx == null || cctx.cacheId() != entry.cacheId())) cctx = ctx.cacheContext(entry.cacheId()); - if(cctx != null && cctx.statisticsEnabled()) + if (cctx != null && cctx.statisticsEnabled()) cctx.cache().metrics0().onRebalanceKeyReceived(); } // If message was last for this partition, // then we take ownership. if (last) { - if (supply.isClean(p)) - part.updateCounter(supply.last().get(p)); - top.own(part); - fut.partitionDone(id, p); + fut.partitionDone(nodeId, p); if (log.isDebugEnabled()) log.debug("Finished rebalancing partition: " + part); @@ -711,14 +734,14 @@ public class GridDhtPartitionDemander { } else { if (last) - fut.partitionDone(id, p); + fut.partitionDone(nodeId, p); if (log.isDebugEnabled()) log.debug("Skipping rebalancing partition (state is not MOVING): " + part); } } else { - fut.partitionDone(id, p); + fut.partitionDone(nodeId, p); if (log.isDebugEnabled()) log.debug("Skipping rebalancing partition (it does not belong on current node): " + p); @@ -728,26 +751,26 @@ public class GridDhtPartitionDemander { // Only request partitions based on latest topology version. for (Integer miss : supply.missed()) { if (aff.get(miss).contains(ctx.localNode())) - fut.partitionMissed(id, miss); + fut.partitionMissed(nodeId, miss); } for (Integer miss : supply.missed()) - fut.partitionDone(id, miss); + fut.partitionDone(nodeId, miss); GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage( - supply.updateSequence(), + supply.rebalanceId(), supply.topologyVersion(), grp.groupId()); d.timeout(grp.config().getRebalanceTimeout()); - d.topic(rebalanceTopics.get(idx)); + d.topic(rebalanceTopics.get(topicId)); if (!topologyChanged(fut) && !fut.isDone()) { // Send demand message. try { - ctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx), - d, grp.ioPolicy(), grp.config().getRebalanceTimeout()); + ctx.io().sendOrderedMessage(node, rebalanceTopics.get(topicId), + d.convertIfNeeded(node.version()), grp.ioPolicy(), grp.config().getRebalanceTimeout()); } catch (ClusterTopologyCheckedException e) { if (log.isDebugEnabled()) { @@ -765,15 +788,17 @@ public class GridDhtPartitionDemander { } /** - * @param pick Node picked for preloading. - * @param p Partition. + * Adds {@code entry} to partition {@code p}. + * + * @param from Node which sent entry. + * @param p Partition id. * @param entry Preloaded entry. * @param topVer Topology version. * @return {@code False} if partition has become invalid during preloading. * @throws IgniteInterruptedCheckedException If interrupted. */ private boolean preloadEntry( - ClusterNode pick, + ClusterNode from, int p, GridCacheEntryInfo entry, AffinityTopologyVersion topVer @@ -789,7 +814,7 @@ public class GridDhtPartitionDemander { cached = cctx.dhtCache().entryEx(entry.key()); if (log.isDebugEnabled()) - log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']'); + log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + from.id() + ']'); cctx.shared().database().checkpointReadLock(); @@ -844,7 +869,7 @@ public class GridDhtPartitionDemander { } catch (IgniteCheckedException e) { throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" + - ctx.localNode() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e); + ctx.localNode() + ", node=" + from.id() + ", key=" + entry.key() + ", part=" + p + ']', e); } finally { ctx.database().checkpointReadUnlock(); @@ -872,7 +897,7 @@ public class GridDhtPartitionDemander { private final IgniteLogger log; /** Remaining. T2: startTime, partitions */ - private final Map<UUID, T2<Long, Collection<Integer>>> remaining = new HashMap<>(); + private final Map<UUID, T2<Long, IgniteDhtDemandedPartitionsMap>> remaining = new HashMap<>(); /** Missed. */ private final Map<UUID, Collection<Integer>> missed = new HashMap<>(); @@ -884,28 +909,28 @@ public class GridDhtPartitionDemander { /** Topology version. */ private final AffinityTopologyVersion topVer; - /** Unique (per demander) sequence id. */ - private final long updateSeq; + /** Unique (per demander) rebalance id. */ + private final long rebalanceId; /** * @param grp Cache group. - * @param assigns Assigns. + * @param assignments Assignments. * @param log Logger. - * @param updateSeq Update sequence. + * @param rebalanceId Rebalance id. */ RebalanceFuture( CacheGroupContext grp, - GridDhtPreloaderAssignments assigns, + GridDhtPreloaderAssignments assignments, IgniteLogger log, - long updateSeq) { - assert assigns != null; + long rebalanceId) { + assert assignments != null; - exchId = assigns.exchangeId(); - topVer = assigns.topologyVersion(); + exchId = assignments.exchangeId(); + topVer = assignments.topologyVersion(); this.grp = grp; this.log = log; - this.updateSeq = updateSeq; + this.rebalanceId = rebalanceId; ctx = grp.shared(); } @@ -919,7 +944,7 @@ public class GridDhtPartitionDemander { this.ctx = null; this.grp = null; this.log = null; - this.updateSeq = -1; + this.rebalanceId = -1; } /** @@ -930,11 +955,11 @@ public class GridDhtPartitionDemander { } /** - * @param updateSeq Update sequence. - * @return true in case future created for specified updateSeq, false in other case. + * @param rebalanceId Rebalance id. + * @return true in case future created for specified {@code rebalanceId}, false in other case. */ - private boolean isActual(long updateSeq) { - return this.updateSeq == updateSeq; + private boolean isActual(long rebalanceId) { + return this.rebalanceId == rebalanceId; } /** @@ -989,20 +1014,18 @@ public class GridDhtPartitionDemander { checkIsDone(); // But will finish syncFuture only when other nodes are preloaded or rebalancing cancelled. } - } /** * @param nodeId Node id. - * @param p P. + * @param p Partition id. */ private void partitionMissed(UUID nodeId, int p) { synchronized (this) { if (isDone()) return; - if (missed.get(nodeId) == null) - missed.put(nodeId, new HashSet<Integer>()); + missed.computeIfAbsent(nodeId, k -> new HashSet<>()); missed.get(nodeId).add(p); } @@ -1018,7 +1041,9 @@ public class GridDhtPartitionDemander { return; GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage( - -1/* remove supply context signal */, + // Negative number of id signals that supply context + // with the same positive id must be cleaned up at the supply node. + -rebalanceId, this.topologyVersion(), grp.groupId()); @@ -1029,7 +1054,7 @@ public class GridDhtPartitionDemander { d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx)); ctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx), - d, grp.ioPolicy(), grp.config().getRebalanceTimeout()); + d.convertIfNeeded(node.version()), grp.ioPolicy(), grp.config().getRebalanceTimeout()); } } catch (IgniteCheckedException ignored) { @@ -1040,7 +1065,7 @@ public class GridDhtPartitionDemander { /** * @param nodeId Node id. - * @param p P. + * @param p Partition number. */ private void partitionDone(UUID nodeId, int p) { synchronized (this) { @@ -1050,23 +1075,23 @@ public class GridDhtPartitionDemander { if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, exchId.discoveryEvent()); - T2<Long, Collection<Integer>> t = remaining.get(nodeId); + T2<Long, IgniteDhtDemandedPartitionsMap> t = remaining.get(nodeId); - assert t != null : "Remaining not found [grp=" + grp.name() + ", fromNode=" + nodeId + + assert t != null : "Remaining not found [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId + ", part=" + p + "]"; - Collection<Integer> parts = t.get2(); + IgniteDhtDemandedPartitionsMap parts = t.get2(); boolean rmvd = parts.remove(p); - assert rmvd : "Partition already done [grp=" + grp.name() + ", fromNode=" + nodeId + + assert rmvd : "Partition already done [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId + ", part=" + p + ", left=" + parts + "]"; if (parts.isEmpty()) { U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") + - "rebalancing [fromNode=" + nodeId + - ", cacheOrGroup=" + grp.cacheOrGroupName() + - ", topology=" + topologyVersion() + + "rebalancing [fromNode=" + nodeId + + ", cacheOrGroup=" + grp.cacheOrGroupName() + + ", topology=" + topologyVersion() + ", time=" + (U.currentTimeMillis() - t.get1()) + " ms]")); remaining.remove(nodeId); @@ -1109,8 +1134,8 @@ public class GridDhtPartitionDemander { if (remaining.isEmpty()) { sendRebalanceFinishedEvent(); - if (log.isDebugEnabled()) - log.debug("Completed rebalance future: " + this); + if (log.isInfoEnabled()) + log.info("Completed rebalance future: " + this); ctx.exchange().scheduleResendPartitions();