anton-vinogradov commented on a change in pull request #9807:
URL: https://github.com/apache/ignite/pull/9807#discussion_r811709168



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
##########
@@ -145,101 +150,94 @@ protected GridNearReadRepairAbstractFuture(
         canRemap = topVer == null;
 
         this.topVer = canRemap ? ctx.affinity().affinityTopologyVersion() : 
topVer;
-    }
 
-    /**
-     *
-     */
-    protected final void init() {
-        map();
+        Map<ClusterNode, Collection<KeyCacheObject>> mappings = new 
HashMap<>();
+
+        for (KeyCacheObject key : keys) {
+            List<ClusterNode> nodes = ctx.affinity().nodesByKey(key, 
this.topVer);
+
+            primaries.put(key, nodes.get(0));
+
+            for (ClusterNode node : nodes)
+                mappings.computeIfAbsent(node, k -> new HashSet<>()).add(key);
+        }
+
+        if (mappings.isEmpty())
+            onDone(new ClusterTopologyServerNotFoundException("Failed to map 
keys for cache " +
+                "(all partition nodes left the grid) [topVer=" + this.topVer + 
", cache=" + ctx.name() + ']'));
+
+        for (Map.Entry<ClusterNode, Collection<KeyCacheObject>> mapping : 
mappings.entrySet()) {
+            ClusterNode node = mapping.getKey();
+
+            GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut =
+                new GridPartitionedGetFuture<>(
+                    ctx,
+                    mapping.getValue(), // Keys.
+                    readThrough,
+                    false, // Local get required.
+                    taskName,
+                    deserializeBinary,
+                    recovery,
+                    expiryPlc,
+                    false,
+                    true,
+                    true,
+                    tx != null ? tx.label() : null,
+                    tx != null ? tx.mvccSnapshot() : null,
+                    node);
+
+            futs.put(mapping.getKey(), fut);
+
+            fut.listen(this::onResult);
+        }
     }
 
     /**
      *
      */
-    private synchronized void map() {
-        assert futs.isEmpty() : "Remapping started without the clean-up.";
-
-        Map<KeyCacheObject, ClusterNode> primaryNodes = new HashMap<>();
+    protected final void init() {
+        assert !futs.isEmpty();
 
         IgniteInternalTx prevTx = ctx.tm().tx(tx); // Within the original tx.
 
         try {
-            Map<ClusterNode, Collection<KeyCacheObject>> mappings = new 
HashMap<>();
-
-            for (KeyCacheObject key : keys) {
-                List<ClusterNode> nodes = ctx.affinity().nodesByKey(key, 
topVer);
-
-                primaryNodes.put(key, nodes.get(0));
-
-                for (ClusterNode node : nodes)
-                    mappings.computeIfAbsent(node, k -> new 
HashSet<>()).add(key);
-            }
-
-            primaries = primaryNodes;
-
-            for (Map.Entry<ClusterNode, Collection<KeyCacheObject>> mapping : 
mappings.entrySet()) {
-                ClusterNode node = mapping.getKey();
-
-                GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut =
-                    new GridPartitionedGetFuture<>(
-                        ctx,
-                        mapping.getValue(), // Keys.
-                        readThrough,
-                        false, // Local get required.
-                        taskName,
-                        deserializeBinary,
-                        recovery,
-                        expiryPlc,
-                        false,
-                        true,
-                        true,
-                        tx != null ? tx.label() : null,
-                        tx != null ? tx.mvccSnapshot() : null,
-                        node);
-
-                fut.listen(this::onResult);
-
-                futs.put(mapping.getKey(), fut);
-            }
-
             for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut 
: futs.values())
                 fut.init(topVer);
-
-            if (futs.isEmpty())
-                onDone(new ClusterTopologyServerNotFoundException("Failed to 
map keys for cache " +
-                    "(all partition nodes left the grid) [topVer=" + topVer + 
", cache=" + ctx.name() + ']'));
         }
         finally {
             ctx.tm().tx(prevTx);
         }
     }
 
     /**
-     * @param topVer Topology version.
+     * @param fut Future to be notified.
      */
-    protected final void remap(AffinityTopologyVersion topVer) {
-        futs.clear();
+    protected final void initOnRemap(GridNearReadRepairAbstractFuture fut) {
+        remapCnt = fut.remapCnt + 1;
+
+        listen(f -> {
+            assert !fut.isDone();
 
-        this.topVer = topVer;
+            fut.onDone(f.result(), f.error());
+        });
 
-        map();
+        init();
     }
 
+    /**
+     * @param topVer Topology version.
+     */
+    protected abstract void remap(AffinityTopologyVersion topVer);
+
     /**
      * Collects results of each 'get' future and prepares an overall result of 
the operation.
      *
      * @param finished Future represents a result of GET operation.
      */
-    protected final synchronized void 
onResult(IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> finished) {
-        if (isDone() // All subfutures (including currently processing) were 
successfully finished at previous future processing.
-            || (topVer == null) // Remapping, ignoring any updates until 
remapped.
-            || !futs.containsValue((GridPartitionedGetFuture<KeyCacheObject, 
EntryGetResult>)finished)) // Remapped.
-            return;
-
+    protected final void onResult(IgniteInternalFuture<Map<KeyCacheObject, 
EntryGetResult>> finished) {

Review comment:
       Seems, currently possible to remap the future on each failure.
   Future should be remapped only once.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to