alex-plekhanov commented on code in PR #10140:
URL: https://github.com/apache/ignite/pull/10140#discussion_r938492862


##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java:
##########
@@ -81,42 +97,41 @@ else if (topVer.equals(lastTop.topVer)) {
      * @param cacheId Cache id.
      */
     public boolean affinityUpdateRequired(int cacheId) {
-        TopologyNodes top = lastTop.get();
-
-        if (top == null) { // Don't know current topology.
-            pendingCacheIds.add(cacheId);
-
-            return false;
-        }
-
-        ClientCacheAffinityMapping mapping = affinityMapping;
-
-        if (mapping == null) {
-            pendingCacheIds.add(cacheId);
-
-            return true;
-        }
+        ClientCacheAffinityMapping mapping = currentMapping();
 
-        if (top.topVer.compareTo(mapping.topologyVersion()) > 0) {
+        if (mapping == null || !mapping.cacheIds().contains(cacheId)) {
             pendingCacheIds.add(cacheId);
 
             return true;
         }
 
-        if (mapping.cacheIds().contains(cacheId))
-            return false;
-        else {
-            pendingCacheIds.add(cacheId);
-
-            return true;
-        }
+        return false;
     }
 
     /**
      * @param ch Payload output channel.
      */
     public void writePartitionsUpdateRequest(PayloadOutputChannel ch) {
-        ClientCacheAffinityMapping.writeRequest(ch, pendingCacheIds);
+        assert rq == null : "Previous mapping request was not properly 
handled: " + rq;
+
+        final Set<Integer> cacheIds;
+        long lastAccessed;
+
+        synchronized (cacheKeyMapperFactoryMap) {
+            cacheIds = new HashSet<>(pendingCacheIds);
+
+            pendingCacheIds.removeAll(cacheIds);

Review Comment:
   It's dangerous to remove pending cache ids on request write, we can do it 
only on response read (in case one of servers is unavailable pending cache ids 
will be lost)



##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java:
##########
@@ -126,26 +141,50 @@ public synchronized boolean 
readPartitionsUpdateResponse(PayloadInputChannel ch)
         if (lastTop.get() == null)
             return false;
 
-        ClientCacheAffinityMapping newMapping = 
ClientCacheAffinityMapping.readResponse(ch);
+        CacheMappingRequest rq0 = rq;
+
+        ClientCacheAffinityMapping newMapping = 
ClientCacheAffinityMapping.readResponse(ch,
+            new Function<Integer, Function<Integer, 
ClientPartitionAwarenessMapper>>() {
+                @Override public Function<Integer, 
ClientPartitionAwarenessMapper> apply(Integer cacheId) {
+                    synchronized (cacheKeyMapperFactoryMap) {
+                        ClientPartitionAwarenessMapperHolder hld = 
cacheKeyMapperFactoryMap.get(cacheId);
+
+                        // Factory concurrently removed on cache destroy.
+                        if (hld == null || hld.ts == REMOVED_TS)
+                            return null;
+
+                        return hld.factory;
+                    }
+                }
+            }
+        );
+
+        // Clean up outdated factories.
+        rq0.caches.removeAll(newMapping.cacheIds());
+
+        synchronized (cacheKeyMapperFactoryMap) {
+            cacheKeyMapperFactoryMap.entrySet()
+                .removeIf(e -> rq0.caches.contains(e.getKey())
+                    && e.getValue().ts <= rq0.ts);
+        }
+
+        rq = null;
 
         ClientCacheAffinityMapping oldMapping = affinityMapping;
 
-        if (oldMapping == null || 
newMapping.topologyVersion().compareTo(oldMapping.topologyVersion()) > 0) {
+        if (oldMapping == null || newMapping.compareTo(oldMapping) > 0) {
             affinityMapping = newMapping;
 
+            // Re-request mappings that are out of date.
             if (oldMapping != null)
                 pendingCacheIds.addAll(oldMapping.cacheIds());
 
-            pendingCacheIds.removeAll(newMapping.cacheIds());

Review Comment:
   We should exclude cache ids we already aware (with such a change duplicated 
information will be requested with the next request)



##########
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java:
##########
@@ -55,6 +63,81 @@ public void testResourcesReleasedAfterClientClosed() throws 
Exception {
         assertTrue(GridTestUtils.waitForCondition(() -> 
threadsCount(THREAD_PREFIX) == 0, 1_000L));
     }
 
+    /**
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testResourcesReleasedAfterCacheDestroyed() throws Exception {
+        int cacheId = CU.cacheId(PART_CUSTOM_AFFINITY_CACHE_NAME);
+        startGrids(2);
+
+        initClient(getClientConfiguration(0, 1)
+            .setPartitionAwarenessMapperFactory(new 
ClientPartitionAwarenessMapperFactory() {
+                /** {@inheritDoc} */
+                @Override public ClientPartitionAwarenessMapper create(String 
cacheName, int partitions) {
+                    assertEquals(cacheName, PART_CUSTOM_AFFINITY_CACHE_NAME);
+
+                    AffinityFunction aff = new 
RendezvousAffinityFunction(false, partitions);
+
+                    return aff::partition;
+                }
+            }), 0, 1);
+
+        ClientCache<Object, Object> clientCache = 
client.cache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+        IgniteInternalCache<Object, Object> gridCache = 
grid(0).context().cache().cache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+
+        clientCache.put(0, 0);
+        TestTcpClientChannel opCh = affinityChannel(0, gridCache);
+
+        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+        assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
+
+        for (int i = 1; i < KEY_CNT; i++)
+            clientCache.put(i, i);
+
+        ClientCacheAffinityContext affCtx = clientAffinityContext(client);
+        AffinityTopologyVersion ver = 
affCtx.currentMapping().topologyVersion();
+
+        grid(0).destroyCache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+        awaitPartitionMapExchange();
+
+        // Cache destroyed, but mappings still exist on the client side.
+        assertEquals(opCh.serverNodeId(), affCtx.affinityNode(cacheId, 
Integer.valueOf(0)));
+
+        client.cache(PART_CACHE_NAME).put(1, 1);
+
+        // await mappings updated.
+        assertTrue(GridTestUtils.waitForCondition(() -> {
+            ClientCacheAffinityMapping m = affCtx.currentMapping();
+
+            if (m == null)
+                return false;
+
+            return m.topologyVersion().equals(ver.nextMinorVersion());
+        }, 5_000L));
+
+        assertNull(affCtx.currentMapping().affinityNode(cacheId, 0));

Review Comment:
   I think it's more important to check that `cacheKeyMapperFactoryMap` is 
cleaned.



##########
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java:
##########
@@ -55,6 +63,81 @@ public void testResourcesReleasedAfterClientClosed() throws 
Exception {
         assertTrue(GridTestUtils.waitForCondition(() -> 
threadsCount(THREAD_PREFIX) == 0, 1_000L));
     }
 
+    /**
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testResourcesReleasedAfterCacheDestroyed() throws Exception {
+        int cacheId = CU.cacheId(PART_CUSTOM_AFFINITY_CACHE_NAME);
+        startGrids(2);
+
+        initClient(getClientConfiguration(0, 1)
+            .setPartitionAwarenessMapperFactory(new 
ClientPartitionAwarenessMapperFactory() {
+                /** {@inheritDoc} */
+                @Override public ClientPartitionAwarenessMapper create(String 
cacheName, int partitions) {
+                    assertEquals(cacheName, PART_CUSTOM_AFFINITY_CACHE_NAME);
+
+                    AffinityFunction aff = new 
RendezvousAffinityFunction(false, partitions);
+
+                    return aff::partition;
+                }
+            }), 0, 1);
+
+        ClientCache<Object, Object> clientCache = 
client.cache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+        IgniteInternalCache<Object, Object> gridCache = 
grid(0).context().cache().cache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+
+        clientCache.put(0, 0);
+        TestTcpClientChannel opCh = affinityChannel(0, gridCache);
+
+        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+        assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
+
+        for (int i = 1; i < KEY_CNT; i++)
+            clientCache.put(i, i);
+
+        ClientCacheAffinityContext affCtx = clientAffinityContext(client);
+        AffinityTopologyVersion ver = 
affCtx.currentMapping().topologyVersion();
+
+        grid(0).destroyCache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+        awaitPartitionMapExchange();
+
+        // Cache destroyed, but mappings still exist on the client side.
+        assertEquals(opCh.serverNodeId(), affCtx.affinityNode(cacheId, 
Integer.valueOf(0)));
+
+        client.cache(PART_CACHE_NAME).put(1, 1);
+
+        // await mappings updated.
+        assertTrue(GridTestUtils.waitForCondition(() -> {
+            ClientCacheAffinityMapping m = affCtx.currentMapping();
+
+            if (m == null)
+                return false;
+
+            return m.topologyVersion().equals(ver.nextMinorVersion());
+        }, 5_000L));
+
+        assertNull(affCtx.currentMapping().affinityNode(cacheId, 0));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+        client.close();
+    }
+
+    /**
+     * @param clnt Ignite client.
+     * @return Client context.
+     */
+    private static ClientCacheAffinityContext 
clientAffinityContext(IgniteClient clnt) {
+        assertTrue(clnt instanceof TcpIgniteClient);
+
+        return GridTestUtils.getFieldValue(GridTestUtils.getFieldValue(clnt, 
"ch"),

Review Comment:
   Perhaps, it's better to use package private fields/methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to