sanpwc commented on a change in pull request #103:
URL: https://github.com/apache/ignite-3/pull/103#discussion_r620151195



##########
File path: 
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
##########
@@ -52,9 +72,121 @@ public AffinityManager(
         this.configurationMgr = configurationMgr;
         this.metaStorageMgr = metaStorageMgr;
         this.baselineMgr = baselineMgr;
+
+        String localNodeName = 
configurationMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY)
+            .name().value();
+
+        
configurationMgr.configurationRegistry().getConfiguration(LocalConfiguration.KEY)
+            .metastorageMembers().listen(ctx -> {
+                if (ctx.newValue() != null) {
+                    if (hasMetastorageLocally(localNodeName, ctx.newValue()))
+                        subscribeToAssignmentCalculation();
+                    else
+                        unsubscribeFromAssignmentCalculation();
+                }
+            return CompletableFuture.completedFuture(null);
+        });
+
+        String[] metastorageMembers = 
configurationMgr.configurationRegistry().getConfiguration(LocalConfiguration.KEY)
+            .metastorageMembers().value();
+
+        if (hasMetastorageLocally(localNodeName, metastorageMembers))
+            subscribeToAssignmentCalculation();
     }
 
-    // TODO: IGNITE-14237 Affinity function.
-    // TODO: IGNITE-14238 Creating and destroying caches.
-    // TODO: IGNITE-14235 Provide a minimal cache/table configuration.
+    /**
+     * Checks whether the local node hosts Metastorage.
+     *
+     * @param localNodeName Local node uniq name.
+     * @param metastorageMembers Metastorage members names.
+     * @return True if the node has Metastorage, false otherwise.
+     */
+    private boolean hasMetastorageLocally(String localNodeName, String[] 
metastorageMembers) {
+        boolean isLocalNodeHasMetasorage = false;
+
+        for (String name : metastorageMembers) {
+            if (name.equals(localNodeName)) {
+                isLocalNodeHasMetasorage = true;
+
+                break;
+            }
+        }
+        return isLocalNodeHasMetasorage;
+    }
+
+    /**
+     * Subscribes to metastorage members update.
+     */
+    private void subscribeToAssignmentCalculation() {
+        assert affinityCalculateSubscriptionFut == null : "Affinity 
calculation already subscribed";
+
+        String tableInternalPrefix = INTERNAL_PREFIX + "assignment.#";
+
+        affinityCalculateSubscriptionFut = metaStorageMgr.registerWatch(new 
Key(tableInternalPrefix), new WatchListener() {
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> 
events) {
+                for (WatchEvent evt : events) {
+                    if (ArrayUtils.empty(evt.newEntry().value())) {
+                        String keyTail = 
evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
+
+                        String placeholderValue = keyTail.substring(0, 
keyTail.indexOf('.'));
+
+                        UUID tblId = UUID.fromString(placeholderValue);
+
+                        try {
+                            String name = new String(metaStorageMgr.get(
+                                new Key(INTERNAL_PREFIX + 
tblId.toString())).get()
+                                .value(), StandardCharsets.UTF_8);
+
+                            int partitions = 
configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
+                                .tables().get(name).partitions().value();
+                            int replicas = 
configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
+                                .tables().get(name).replicas().value();
+
+                            metaStorageMgr.put(evt.newEntry().key(), 
IgniteUtils.toBytes(

Review comment:
        Please use invoke instead of put, with cas based on revision of empty 
INTERNAL_PREFIX + "assignment.<tblId>".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to