Mmuzaf commented on a change in pull request #6951: Ignite 11073 12069 P2P 
Rebalance collaboration work
URL: https://github.com/apache/ignite/pull/6951#discussion_r410559588
 
 

 ##########
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionPreloadingRoutine.java
 ##########
 @@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Partition File preloading routine.
+ */
+public class PartitionPreloadingRoutine extends GridFutureAdapter<Boolean> {
+    /** Rebalance topology version. */
+    private final AffinityTopologyVersion topVer;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Cache shared context. */
+    private final GridCacheSharedContext cctx;
+
+    /** Unique (per demander) rebalance id. */
+    private final long rebalanceId;
+
+    /** Lock. */
+    private final ReentrantLock lock = new ReentrantLock();
+
+    /** Partition restore handler. */
+    private final PartitionRestoreHandler restoreHnd;
+
+    /** Exchange ID. */
+    private final GridDhtPartitionExchangeId exchId;
+
+    /** Remaining nodes with groups and the number of partitions. */
+    @GridToStringInclude
+    private final Map<UUID, Map<Integer, Set<Integer>>> remaining;
+
+    /** Count of partition snapshots received. */
+    private final AtomicLong receivedCnt = new AtomicLong();
+
+    /** Cache group with restored partition snapshots and HWM value of update 
counter mapped to node identifier. */
+    @GridToStringInclude
+    private final Map<Integer, Map<UUID, Map<Integer, Long>>> restored = new 
ConcurrentHashMap<>();
+
+    /**
+     * Cache group identifiers with historical assignments future that will be 
completed when partition files are
+     * preloaded.
+     */
+    private final Map<Integer, GridFutureAdapter<GridDhtPreloaderAssignments>> 
futAssigns = new ConcurrentHashMap<>();
+
+    /** Total number of partitions. */
+    private final long totalPartitionsCnt;
+
+    /** Snapshot future. */
+    private IgniteInternalFuture<Boolean> snapshotFut;
+
+    /**
+     * @param exchFut Exchange future.
+     * @param cctx Cache shared context.
+     * @param rebalanceId Rebalance ID
+     * @param assignments Assignments mapped by node ID.
+     */
+    public PartitionPreloadingRoutine(
+        GridDhtPartitionsExchangeFuture exchFut,
+        GridCacheSharedContext cctx,
+        long rebalanceId,
+        Map<UUID, Map<Integer, Set<Integer>>> assignments
+    ) {
+        long totalParts = 0;
+
+        // Copy contents.
+        Map<UUID, Map<Integer, Set<Integer>>> remaining0 = 
U.newHashMap(assignments.size());
+
+        for (Map.Entry<UUID, Map<Integer, Set<Integer>>> nodeAssign : 
assignments.entrySet()) {
+            Map<Integer, Set<Integer>> nodeAssign0 = new 
ConcurrentHashMap<>(nodeAssign.getValue().size());
+
+            remaining0.put(nodeAssign.getKey(), nodeAssign0);
+
+            for (Map.Entry<Integer, Set<Integer>> grpAssign : 
nodeAssign.getValue().entrySet()) {
+                nodeAssign0.put(grpAssign.getKey(), new 
GridConcurrentHashSet<>(grpAssign.getValue()));
+                futAssigns.put(grpAssign.getKey(), new GridFutureAdapter<>());
+
+                totalParts += grpAssign.getValue().size();
+            }
+        }
+
+        this.cctx = cctx;
+        this.rebalanceId = rebalanceId;
+
+        exchId = exchFut.exchangeId();
+        topVer = exchFut.topologyVersion();
+        log = cctx.kernalContext().log(getClass());
+        totalPartitionsCnt = totalParts;
+        remaining = Collections.unmodifiableMap(remaining0);
+
+        restoreHnd = new PartitionRestoreHandler(cctx);
+    }
+
+    /**
+     * Start partitions preloading.
+     *
+     * @return Cache group identifiers with futures that will be completed 
when partitions are preloaded.
+     */
+    public Map<Integer, IgniteInternalFuture<GridDhtPreloaderAssignments>> 
startPartitionsPreloading() {
+        assert !remaining.isEmpty();
+
+        restoreHnd.start();
+
+        requestPartitionsSnapshot(remaining.entrySet().iterator());
+
+        return Collections.unmodifiableMap(futAssigns);
+    }
+
+    /**
+     * @param it Iterator on node assignments.
+     */
+    private void requestPartitionsSnapshot(Iterator<Map.Entry<UUID, 
Map<Integer, Set<Integer>>>> it) {
+        if (!it.hasNext())
+            return;
+
+        Map.Entry<UUID, Map<Integer, Set<Integer>>> nodeAssigns = it.next();
+
+        UUID nodeId = nodeAssigns.getKey();
+        Map<Integer, Set<Integer>> assigns = nodeAssigns.getValue();
+
+        Set<String> currGroups = new HashSet<>();
+
+        for (Integer grpId : assigns.keySet()) {
+            CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+
+            currGroups.add(grp.cacheOrGroupName());
+        }
+
+        lock.lock();
+
+        try {
+            if (isDone())
+                return;
+
+            if (log.isInfoEnabled())
+                log.info("Preloading partition files [supplier=" + nodeId + ", 
groups=" + currGroups + "]");
+
+            assert snapshotFut == null || snapshotFut.isDone() : snapshotFut;
+
+            (snapshotFut = cctx.snapshotMgr()
+                .createRemoteSnapshot(nodeId,
+                    assigns,
+                    (file, uniquePartId) -> 
onPartitionSnapshotReceived(nodeId, uniquePartId, file)))
+                .chain(f -> {
+                        try {
+                            if (!f.isCancelled() && f.get())
+                                requestPartitionsSnapshot(it);
+                        }
+                        catch (IgniteCheckedException e) {
+                            if (!onDone(e) && log.isDebugEnabled())
+                                log.debug("Stale error (ignored): " + 
e.getMessage());
+                        }
+
+                        return null;
+                    }
+                );
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * @return Set of identifiers of the remaining groups.
+     */
+    public Set<Integer> remainingGroups() {
+        return futAssigns.keySet();
+    }
+
+    /**
+     * @param nodeId Node id.
+     * @param uniquePartId Pair of cache group ID with partition ID.
+     * @param file Partition snapshot file.
+     */
+    public void onPartitionSnapshotReceived(UUID nodeId, GroupPartitionId 
uniquePartId, File file) {
+        int grpId = uniquePartId.getGroupId();
+        int partId = uniquePartId.getPartitionId();
+
+        CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+
+        if (grp == null) {
+            log.warning("Partition snapshot initialization skipped, cache 
group not found [grpId=" + grpId + "]");
+
+            return;
+        }
+
+        lock.lock();
+
+        try {
+            // Ensure that we are not stopping when getting checkpoint read 
lock.
+            if (isDone())
+                return;
+
+            GridDhtLocalPartition part = grp.topology().localPartition(partId);
+
+            assert part != null;
+
+            IgniteInternalFuture<Long> restoreFut = restoreHnd.schedule(grp, 
part, file);
+
+            restoreFut.listen(f -> {
+                try {
+                    if (!f.isCancelled())
+                        onPartitionSnapshotRestored(nodeId, grpId, partId, 
f.get());
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Unable to restore partition snapshot [grpId=" + 
grpId + ", p=" + partId + "]");
+
+                    onDone(e);
+                }
+            });
+        } catch (IgniteCheckedException e) {
+            log.error("Unable to restore partition snapshot " +
+                "[grp=" + grp.cacheOrGroupName() +
+                ", p=" + partId +
+                ", file=" + file + "]", e);
+
+            onDone(e);
+        } finally {
+            lock.unlock();
+        }
+
+        if (receivedCnt.incrementAndGet() == totalPartitionsCnt) {
+            if (log.isInfoEnabled())
+                log.info("All partition files are received - triggering 
checkpoint to complete rebalancing.");
+
+            cctx.database().wakeupForCheckpoint("Partition files preload 
complete.");
+        }
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     * @param partId Partition ID.
+     * @param cntr The highest value of the update counter before this 
partition began to process updates.
+     */
+    private void onPartitionSnapshotRestored(UUID nodeId, int grpId, int 
partId, long cntr) {
+        Map<Integer, Set<Integer>> grpParts = remaining.get(nodeId);
+
+        assert  grpParts != null : "nodeId=" + nodeId + ", grpId=" + grpId + 
", p=" + partId;
+
+        Set<Integer> parts = grpParts.get(grpId);
+
+        boolean rmvd = parts.remove(partId);
+
+        assert rmvd : "nodeId=" + nodeId + ", grpId=" + grpId + ", p=" + 
partId;
+
+        Map<UUID, Map<Integer, Long>> grpCntrs = 
restored.computeIfAbsent(grpId, v -> new ConcurrentHashMap<>());
+
+        grpCntrs.computeIfAbsent(nodeId, v -> new 
ConcurrentHashMap<>()).put(partId, cntr);
+
+        GridFutureAdapter<GridDhtPreloaderAssignments> fut;
+
+        if (!parts.isEmpty() ||
+            grpParts.remove(grpId) == null ||
+            remaining.values().stream().map(Map::keySet).anyMatch(grps -> 
grps.contains(grpId)) ||
+            (fut = futAssigns.remove(grpId)) == null)
+            return;
+
+        CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+
+        assert !grp.localWalEnabled() : "grp=" + grp.cacheOrGroupName();
+
+        IgniteInternalFuture<?> idxFut = cctx.database().rebuildIndexes(grp);
+
+        GridDhtPreloaderAssignments histAssignments = 
makeHistoricalAssignments(grp, grpCntrs);
+
+        fut.onDone(histAssignments);
+
+        if (histAssignments.isEmpty())
+            idxFut.listen(f -> 
cctx.walState().onGroupRebalanceFinished(grp.groupId(), topVer));
+
+        boolean finalPreloading = futAssigns.isEmpty() && onDone(true);
+
+        if (log.isInfoEnabled()) {
+            log.info("Completed" + (finalPreloading ? " (final)" : "") +
+                " partition files preloading [grp=" + grp.cacheOrGroupName() + 
"]");
+        }
+    }
+
+    /**
+     * Prepare assignments for historical rebalancing.
+     *
+     * @param grp Cache group.
+     * @param cntrs Partition set with HWM update counter value for hstorical 
rebalance.
+     * @return Partition to node assignments.
+     */
+    private GridDhtPreloaderAssignments makeHistoricalAssignments(
+        CacheGroupContext grp,
+        Map<UUID, Map<Integer, Long>> cntrs
+    ) {
+        GridDhtPreloaderAssignments histAssigns = new 
GridDhtPreloaderAssignments(exchId, topVer);
+
+        int parts = grp.topology().partitions();
+
+        for (Map.Entry<UUID, Map<Integer, Long>> e : cntrs.entrySet()) {
+            ClusterNode node = cctx.discovery().node(e.getKey());
+
+            assert node != null : e.getKey();
+
+            Map<Integer, Long> orderedCntrs = new TreeMap<>(e.getValue());
+
+            for (Map.Entry<Integer, Long> partCntr : orderedCntrs.entrySet()) {
+                int partId = partCntr.getKey();
+
+                long from = 
grp.topology().localPartition(partId).initialUpdateCounter();
+                long to = partCntr.getValue();
+
+                if (from == to)
+                    continue;
+
+                assert to > from : "from=" + from + ", to=" + to;
+
+                GridDhtPartitionDemandMessage msg = histAssigns.
+                    computeIfAbsent(node, v -> new 
GridDhtPartitionDemandMessage(rebalanceId, topVer, grp.groupId()));
+
+                msg.partitions().addHistorical(partId, from, to, parts);
 
 Review comment:
   It seems we don't need a full DemandMessage creation here, can we create 
only `IgniteDhtDemandedPartitionsMap` and assign it to the message later? So, 
you will convert a `Map<Integer, Long>` to `IgniteDhtDemandedPartitionsMap` per 
each node.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to