alex-plekhanov commented on code in PR #10140:
URL: https://github.com/apache/ignite/pull/10140#discussion_r941232810


##########
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java:
##########
@@ -55,6 +63,80 @@ public void testResourcesReleasedAfterClientClosed() throws 
Exception {
         assertTrue(GridTestUtils.waitForCondition(() -> 
threadsCount(THREAD_PREFIX) == 0, 1_000L));
     }
 
+    /**
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testResourcesReleasedAfterCacheDestroyed() throws Exception {
+        int cacheId = CU.cacheId(PART_CUSTOM_AFFINITY_CACHE_NAME);
+        startGrids(2);
+
+        initClient(getClientConfiguration(0, 1)
+            .setPartitionAwarenessMapperFactory(new 
ClientPartitionAwarenessMapperFactory() {
+                /** {@inheritDoc} */
+                @Override public ClientPartitionAwarenessMapper create(String 
cacheName, int partitions) {
+                    assertEquals(cacheName, PART_CUSTOM_AFFINITY_CACHE_NAME);
+
+                    AffinityFunction aff = new 
RendezvousAffinityFunction(false, partitions);
+
+                    return aff::partition;
+                }
+            }), 0, 1);
+
+        ClientCache<Object, Object> clientCache = 
client.cache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+        IgniteInternalCache<Object, Object> gridCache = 
grid(0).context().cache().cache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+
+        clientCache.put(0, 0);
+        TestTcpClientChannel opCh = affinityChannel(0, gridCache);
+
+        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+        assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
+
+        for (int i = 1; i < KEY_CNT; i++)
+            clientCache.put(i, i);
+
+        ClientCacheAffinityContext affCtx = 
((TcpIgniteClient)client).reliableChannel().affinityContext();
+        AffinityTopologyVersion ver = 
affCtx.currentMapping().topologyVersion();
+
+        grid(0).destroyCache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+        awaitPartitionMapExchange();
+
+        // Cache destroyed, but mappings still exist on the client side.
+        assertEquals(opCh.serverNodeId(), affCtx.affinityNode(cacheId, 
Integer.valueOf(0)));
+
+        client.cache(PART_CACHE_NAME).put(1, 1);
+
+        // await mappings updated.
+        assertTrue(GridTestUtils.waitForCondition(() -> {
+            ClientCacheAffinityMapping m = affCtx.currentMapping();
+
+            if (m == null)
+                return false;
+
+            return m.topologyVersion().equals(ver.nextMinorVersion());
+        }, 5_000L));
+
+        // Mapping for previous caches become outdated and will be updated on 
the next request.
+        assertNull(affCtx.currentMapping().affinityNode(cacheId, 0));
+
+        // Trigger the next affinity mappings update. The outdated cache with 
custom affinity was added
+        // to pending caches list and will be processed and cleared.
+        client.cache(REPL_CACHE_NAME).put(2, 2);
+
+        Map<?, ?> m = GridTestUtils.getFieldValue(affCtx, 
"cacheKeyMapperFactoryMap");

Review Comment:
   Avoid reflection?



##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java:
##########
@@ -61,8 +70,10 @@ public class ClientCacheAffinityContext {
 
     /**
      * @param binary Binary data processor.
+     * @param mapFacotry Factory for caches with custom affinity.
      */
-    public ClientCacheAffinityContext(IgniteBinary binary) {
+    public ClientCacheAffinityContext(IgniteBinary binary, @Nullable 
ClientPartitionAwarenessMapperFactory mapFacotry) {

Review Comment:
   mapFacotry -> mapFactory



##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java:
##########
@@ -141,11 +205,9 @@ public synchronized boolean 
readPartitionsUpdateResponse(PayloadInputChannel ch)
             return true;
         }
 
-        if (newMapping.topologyVersion().equals(oldMapping.topologyVersion())) 
{
+        if (newMapping.compareTo(oldMapping) == 0) {
             affinityMapping = ClientCacheAffinityMapping.merge(oldMapping, 
newMapping);
 
-            pendingCacheIds.removeAll(newMapping.cacheIds());

Review Comment:
   If topology is not changed the same caches will be requested again.
   Let's also do `removeAll(rq0.caches)` to avoid cache Ids leaks.



##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java:
##########
@@ -402,6 +403,25 @@ public <T> IgniteClientFuture<T> affinityServiceAsync(
         return serviceAsync(op, payloadWriter, payloadReader);
     }
 
+    /**
+     * @param cacheName Cache name.
+     */
+    public void registerKeyPartitionMapperFactory(String cacheName) {
+        ClientPartitionAwarenessMapperFactory factory = 
clientCfg.getPartitionAwarenessMapperFactory();
+
+        if (factory == null)
+            return;
+
+        affinityCtx.putKeyMapperFactory(cacheName);

Review Comment:
   Since there is no second parameter now, perhaps it should be also named 
`register...` instead of `put...`



##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java:
##########
@@ -64,6 +69,11 @@ public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
+    /** {@inheritDoc} */
+    @Override public int compareTo(@NotNull ClientCacheAffinityMapping o) {

Review Comment:
   I think it's more clear when we compare topology version of mapping directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to