Mmuzaf commented on a change in pull request #6951: Ignite 11073 12069 P2P 
Rebalance collaboration work
URL: https://github.com/apache/ignite/pull/6951#discussion_r410711862
 
 

 ##########
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionPreloadingRoutine.java
 ##########
 @@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Partition File preloading routine.
+ */
+public class PartitionPreloadingRoutine extends GridFutureAdapter<Boolean> {
+    /** Rebalance topology version. */
+    private final AffinityTopologyVersion topVer;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Cache shared context. */
+    private final GridCacheSharedContext cctx;
+
+    /** Unique (per demander) rebalance id. */
+    private final long rebalanceId;
+
+    /** Lock. */
+    private final ReentrantLock lock = new ReentrantLock();
+
+    /** Partition restore handler. */
+    private final PartitionRestoreHandler restoreHnd;
+
+    /** Exchange ID. */
+    private final GridDhtPartitionExchangeId exchId;
+
+    /** Remaining nodes with groups and the number of partitions. */
+    @GridToStringInclude
+    private final Map<UUID, Map<Integer, Set<Integer>>> remaining;
+
+    /** Count of partition snapshots received. */
+    private final AtomicLong receivedCnt = new AtomicLong();
+
+    /** Cache group with restored partition snapshots and HWM value of update 
counter mapped to node identifier. */
+    @GridToStringInclude
+    private final Map<Integer, Map<UUID, Map<Integer, Long>>> restored = new 
ConcurrentHashMap<>();
+
+    /**
+     * Cache group identifiers with historical assignments future that will be 
completed when partition files are
+     * preloaded.
+     */
+    private final Map<Integer, GridFutureAdapter<GridDhtPreloaderAssignments>> 
futAssigns = new ConcurrentHashMap<>();
+
+    /** Total number of partitions. */
+    private final long totalPartitionsCnt;
+
+    /** Snapshot future. */
+    private IgniteInternalFuture<Boolean> snapshotFut;
+
+    /**
+     * @param exchFut Exchange future.
+     * @param cctx Cache shared context.
+     * @param rebalanceId Rebalance ID
+     * @param assignments Assignments mapped by node ID.
+     */
+    public PartitionPreloadingRoutine(
+        GridDhtPartitionsExchangeFuture exchFut,
+        GridCacheSharedContext cctx,
+        long rebalanceId,
+        Map<UUID, Map<Integer, Set<Integer>>> assignments
+    ) {
+        long totalParts = 0;
+
+        // Copy contents.
+        Map<UUID, Map<Integer, Set<Integer>>> remaining0 = 
U.newHashMap(assignments.size());
+
+        for (Map.Entry<UUID, Map<Integer, Set<Integer>>> nodeAssign : 
assignments.entrySet()) {
+            Map<Integer, Set<Integer>> nodeAssign0 = new 
ConcurrentHashMap<>(nodeAssign.getValue().size());
+
+            remaining0.put(nodeAssign.getKey(), nodeAssign0);
+
+            for (Map.Entry<Integer, Set<Integer>> grpAssign : 
nodeAssign.getValue().entrySet()) {
+                nodeAssign0.put(grpAssign.getKey(), new 
GridConcurrentHashSet<>(grpAssign.getValue()));
+                futAssigns.put(grpAssign.getKey(), new GridFutureAdapter<>());
+
+                totalParts += grpAssign.getValue().size();
+            }
+        }
+
+        this.cctx = cctx;
+        this.rebalanceId = rebalanceId;
+
+        exchId = exchFut.exchangeId();
+        topVer = exchFut.topologyVersion();
+        log = cctx.kernalContext().log(getClass());
+        totalPartitionsCnt = totalParts;
+        remaining = Collections.unmodifiableMap(remaining0);
+
+        restoreHnd = new PartitionRestoreHandler(cctx);
+    }
+
+    /**
+     * Start partitions preloading.
+     *
+     * @return Cache group identifiers with futures that will be completed 
when partitions are preloaded.
+     */
+    public Map<Integer, IgniteInternalFuture<GridDhtPreloaderAssignments>> 
startPartitionsPreloading() {
+        assert !remaining.isEmpty();
+
+        restoreHnd.start();
+
+        requestPartitionsSnapshot(remaining.entrySet().iterator());
 
 Review comment:
   Please, use a more straight forward way instead of the self-iterable 
`requestPartitionsSnapshot` method. 
   
   For instance,
   
   ```
           IgniteInternalFuture<?> reqSnapshotChain = new 
GridFinishedFuture<>();
           
           for (Map.Entry<UUID, Map<Integer, Set<Integer>>> e : 
remaining.entrySet())
               reqSnapshotChain = reqSnapshotChain.chain(f -> 
requestPartitionsSnapshot(e.getKey(), e.getValue()));
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to