anton-vinogradov commented on a change in pull request #9807: URL: https://github.com/apache/ignite/pull/9807#discussion_r811709168
########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java ########## @@ -145,101 +150,94 @@ protected GridNearReadRepairAbstractFuture( canRemap = topVer == null; this.topVer = canRemap ? ctx.affinity().affinityTopologyVersion() : topVer; - } - /** - * - */ - protected final void init() { - map(); + Map<ClusterNode, Collection<KeyCacheObject>> mappings = new HashMap<>(); + + for (KeyCacheObject key : keys) { + List<ClusterNode> nodes = ctx.affinity().nodesByKey(key, this.topVer); + + primaries.put(key, nodes.get(0)); + + for (ClusterNode node : nodes) + mappings.computeIfAbsent(node, k -> new HashSet<>()).add(key); + } + + if (mappings.isEmpty()) + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid) [topVer=" + this.topVer + ", cache=" + ctx.name() + ']')); + + for (Map.Entry<ClusterNode, Collection<KeyCacheObject>> mapping : mappings.entrySet()) { + ClusterNode node = mapping.getKey(); + + GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut = + new GridPartitionedGetFuture<>( + ctx, + mapping.getValue(), // Keys. + readThrough, + false, // Local get required. + taskName, + deserializeBinary, + recovery, + expiryPlc, + false, + true, + true, + tx != null ? tx.label() : null, + tx != null ? tx.mvccSnapshot() : null, + node); + + futs.put(mapping.getKey(), fut); + + fut.listen(this::onResult); + } } /** * */ - private synchronized void map() { - assert futs.isEmpty() : "Remapping started without the clean-up."; - - Map<KeyCacheObject, ClusterNode> primaryNodes = new HashMap<>(); + protected final void init() { + assert !futs.isEmpty(); IgniteInternalTx prevTx = ctx.tm().tx(tx); // Within the original tx. try { - Map<ClusterNode, Collection<KeyCacheObject>> mappings = new HashMap<>(); - - for (KeyCacheObject key : keys) { - List<ClusterNode> nodes = ctx.affinity().nodesByKey(key, topVer); - - primaryNodes.put(key, nodes.get(0)); - - for (ClusterNode node : nodes) - mappings.computeIfAbsent(node, k -> new HashSet<>()).add(key); - } - - primaries = primaryNodes; - - for (Map.Entry<ClusterNode, Collection<KeyCacheObject>> mapping : mappings.entrySet()) { - ClusterNode node = mapping.getKey(); - - GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut = - new GridPartitionedGetFuture<>( - ctx, - mapping.getValue(), // Keys. - readThrough, - false, // Local get required. - taskName, - deserializeBinary, - recovery, - expiryPlc, - false, - true, - true, - tx != null ? tx.label() : null, - tx != null ? tx.mvccSnapshot() : null, - node); - - fut.listen(this::onResult); - - futs.put(mapping.getKey(), fut); - } - for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : futs.values()) fut.init(topVer); - - if (futs.isEmpty()) - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + ctx.name() + ']')); } finally { ctx.tm().tx(prevTx); } } /** - * @param topVer Topology version. + * @param fut Future to be notified. */ - protected final void remap(AffinityTopologyVersion topVer) { - futs.clear(); + protected final void initOnRemap(GridNearReadRepairAbstractFuture fut) { + remapCnt = fut.remapCnt + 1; + + listen(f -> { + assert !fut.isDone(); - this.topVer = topVer; + fut.onDone(f.result(), f.error()); + }); - map(); + init(); } + /** + * @param topVer Topology version. + */ + protected abstract void remap(AffinityTopologyVersion topVer); + /** * Collects results of each 'get' future and prepares an overall result of the operation. * * @param finished Future represents a result of GET operation. */ - protected final synchronized void onResult(IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> finished) { - if (isDone() // All subfutures (including currently processing) were successfully finished at previous future processing. - || (topVer == null) // Remapping, ignoring any updates until remapped. - || !futs.containsValue((GridPartitionedGetFuture<KeyCacheObject, EntryGetResult>)finished)) // Remapped. - return; - + protected final void onResult(IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> finished) { Review comment: Seems, currently possible to remap the future on each failure. Future should be remapped only once. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org