sanpwc commented on a change in pull request #103:
URL: https://github.com/apache/ignite-3/pull/103#discussion_r620164032



##########
File path: 
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
##########
@@ -52,9 +72,121 @@ public AffinityManager(
         this.configurationMgr = configurationMgr;
         this.metaStorageMgr = metaStorageMgr;
         this.baselineMgr = baselineMgr;
+
+        String localNodeName = 
configurationMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY)
+            .name().value();
+
+        
configurationMgr.configurationRegistry().getConfiguration(LocalConfiguration.KEY)
+            .metastorageMembers().listen(ctx -> {
+                if (ctx.newValue() != null) {
+                    if (hasMetastorageLocally(localNodeName, ctx.newValue()))
+                        subscribeToAssignmentCalculation();
+                    else
+                        unsubscribeFromAssignmentCalculation();
+                }
+            return CompletableFuture.completedFuture(null);
+        });
+
+        String[] metastorageMembers = 
configurationMgr.configurationRegistry().getConfiguration(LocalConfiguration.KEY)
+            .metastorageMembers().value();
+
+        if (hasMetastorageLocally(localNodeName, metastorageMembers))
+            subscribeToAssignmentCalculation();
     }
 
-    // TODO: IGNITE-14237 Affinity function.
-    // TODO: IGNITE-14238 Creating and destroying caches.
-    // TODO: IGNITE-14235 Provide a minimal cache/table configuration.
+    /**
+     * Checks whether the local node hosts Metastorage.
+     *
+     * @param localNodeName Local node uniq name.
+     * @param metastorageMembers Metastorage members names.
+     * @return True if the node has Metastorage, false otherwise.
+     */
+    private boolean hasMetastorageLocally(String localNodeName, String[] 
metastorageMembers) {
+        boolean isLocalNodeHasMetasorage = false;
+
+        for (String name : metastorageMembers) {
+            if (name.equals(localNodeName)) {
+                isLocalNodeHasMetasorage = true;
+
+                break;
+            }
+        }
+        return isLocalNodeHasMetasorage;
+    }
+
+    /**
+     * Subscribes to metastorage members update.
+     */
+    private void subscribeToAssignmentCalculation() {
+        assert affinityCalculateSubscriptionFut == null : "Affinity 
calculation already subscribed";
+
+        String tableInternalPrefix = INTERNAL_PREFIX + "assignment.#";
+
+        affinityCalculateSubscriptionFut = metaStorageMgr.registerWatch(new 
Key(tableInternalPrefix), new WatchListener() {
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> 
events) {
+                for (WatchEvent evt : events) {
+                    if (ArrayUtils.empty(evt.newEntry().value())) {
+                        String keyTail = 
evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
+
+                        String placeholderValue = keyTail.substring(0, 
keyTail.indexOf('.'));
+
+                        UUID tblId = UUID.fromString(placeholderValue);
+
+                        try {
+                            String name = new String(metaStorageMgr.get(
+                                new Key(INTERNAL_PREFIX + 
tblId.toString())).get()
+                                .value(), StandardCharsets.UTF_8);
+
+                            int partitions = 
configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
+                                .tables().get(name).partitions().value();
+                            int replicas = 
configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
+                                .tables().get(name).replicas().value();
+
+                            metaStorageMgr.put(evt.newEntry().key(), 
IgniteUtils.toBytes(
+                                RendezvousAffinityFunction.assignPartitions(
+                                    baselineMgr.nodes(),
+                                    partitions,
+                                    replicas,
+                                    false,
+                                    null
+                                ))
+                            );
+
+                            LOG.info("Affinity manager calculated assignment 
for the table [name={}, tblId={}]",
+                                name, tblId);
+                        }
+                        catch (InterruptedException | ExecutionException e) {
+                            LOG.error("Failed to initialize affinity [key={}]",
+                                evt.newEntry().key().toString(), e);
+                        }
+                    }
+                }
+
+                return false;

Review comment:
       Why it's false, seems that you should watch new tables creation even 
after assignment calculation for first portion of tables?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to