korlov42 commented on code in PR #2982:
URL: https://github.com/apache/ignite-3/pull/2982#discussion_r1444562809


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java:
##########
@@ -85,6 +94,10 @@ public MappingServiceImpl(
         this.targetProvider = targetProvider;
         this.templatesCache = cacheFactory.create(cacheSize);
         this.taskExecutor = taskExecutor;
+        this.mappingsCache = Caffeine.newBuilder()

Review Comment:
   `templatesCache` is created by `cacheFactory`. What is the reason to create 
`mappingsCache` directly by calling Caffeine's cache builder?



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java:
##########
@@ -96,12 +109,53 @@ public CompletableFuture<List<MappedFragment>> 
map(MultiStepPlan multiStepPlan)
         return initialTopologyFuture.thenComposeAsync(ignore -> 
map0(multiStepPlan), taskExecutor);
     }
 
+    /** Called when the primary replica has expired. */
+    public CompletableFuture<Boolean> 
onPrimaryReplicaExpired(PrimaryReplicaEventParameters parameters) {
+        assert parameters != null;
+        assert parameters.groupId() instanceof TablePartitionId;
+
+        int tabId = ((TablePartitionId) parameters.groupId()).tableId();
+
+        // TODO https://issues.apache.org/jira/browse/IGNITE-21201 Move 
complex computations to a different thread.
+        mappingsCache.values().removeIf(value -> 
value.tableIds.contains(tabId));
+
+        return CompletableFutures.falseCompletedFuture();
+    }
+
     private CompletableFuture<List<MappedFragment>> map0(MultiStepPlan 
multiStepPlan) {
-        List<String> nodes = topologyHolder.nodes();
-        MappingContext context = new MappingContext(localNodeName, nodes);
+        TopSnapshot topology = topologyHolder.topology();
+        MappingContext context = new MappingContext(localNodeName, 
topology.nodes());
 
         FragmentsTemplate template = getOrCreateTemplate(multiStepPlan, 
context);
 
+        MappingsCacheValue cacheValue = 
mappingsCache.compute(multiStepPlan.id(), (key, val) -> {
+            if (val == null) {
+                IntSet tableIds = new IntOpenHashSet();
+                boolean topAware = false;

Review Comment:
   nitpicking: it's better to avoid abbreviations with exception to commonly 
used ones (`topAware` --> `topologyAware`, `TopSnapshot` --> `TopologySnapshot`)



##########
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java:
##########
@@ -225,6 +229,10 @@ public void initSchema(String script) {
         }
     }
 
+    public CatalogService catalogService() {
+        return catalogService;

Review Comment:
   does it make sense to expose `CatalogService` at `TestCluster` instead of 
`TestNode`? It looks a bit strange that catalogService passed to a node just to 
be returned by getter.



##########
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java:
##########
@@ -134,6 +164,66 @@ public void lateServiceInitializationOnNodeJoin() {
         assertThat(mappingService.map(PLAN), willSucceedFast());
     }
 
+    @Test
+    public void testCacheInvalidationOnTopologyChange() {
+        String localNodeName = "NODE";
+        List<String> nodeNames = List.of(localNodeName, "NODE1");
+
+        // Initialize mapping service.
+        MappingServiceImpl mappingService = 
createMappingService(localNodeName, nodeNames);
+        mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
+                new LogicalTopologySnapshot(1, 
logicalNodes(nodeNames.toArray(new String[0]))));
+
+        List<MappedFragment> tableOnlyMapping = 
await(mappingService.map(PLAN));
+        List<MappedFragment> sysViewMapping = 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW));
+
+        assertSame(tableOnlyMapping, await(mappingService.map(PLAN)));
+        assertSame(sysViewMapping, 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW)));
+
+        mappingService.onNodeLeft(Mockito.mock(LogicalNode.class),
+                new LogicalTopologySnapshot(2, logicalNodes("NODE")));
+        // Plan with tables only must not be invalidated.
+        assertSame(tableOnlyMapping, await(mappingService.map(PLAN)));
+        // Plan with system views must be invalidated.
+        assertNotSame(sysViewMapping, 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW)));
+    }
+
+    @Test
+    public void testCacheInvalidationOnPrimaryExpiration() {
+        String localNodeName = "NODE";
+        List<String> nodeNames = List.of(localNodeName, "NODE1");
+
+        Function<String, PrimaryReplicaEventParameters> prepareEvtParams = 
(name) -> {
+            CatalogService catalogService = 
cluster.node("N1").catalogService();
+            Catalog catalog = 
catalogService.catalog(catalogService.latestCatalogVersion());
+
+            Optional<Integer> tblId = catalog.tables().stream()
+                    .filter(desc -> name.equals(desc.name()))
+                    .findFirst()
+                    .map(CatalogObjectDescriptor::id);
+
+            assertTrue(tblId.isPresent());
+
+            return new PrimaryReplicaEventParameters(
+                    0, new TablePartitionId(tblId.get(), 0), "test", 
HybridTimestamp.MIN_VALUE);
+        };
+
+        // Initialize mapping service.
+        MappingServiceImpl mappingService = 
createMappingService(localNodeName, nodeNames);
+        mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
+                new LogicalTopologySnapshot(1, 
logicalNodes(nodeNames.toArray(new String[0]))));
+
+        List<MappedFragment> mappedFragments = 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW));
+
+        // Simulate expiration of the primary replica for non-mapped table - 
the cache entry should not be invalidated.
+        
await(mappingService.onPrimaryReplicaExpired(prepareEvtParams.apply("T2")));
+        assertSame(mappedFragments, 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW)));
+
+        // Simulate expiration of the primary replica for mapped table - the 
cache entry should be invalidated.
+        
await(mappingService.onPrimaryReplicaExpired(prepareEvtParams.apply("T1")));
+        assertNotSame(mappedFragments, 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW)));
+    }
+

Review Comment:
   what do you think about adding a few more tests with mocks? It would be nice 
to check that not only returned objects are the same, but also that 
`ExecutionTargetProvider` is not called when the result is expected to be from 
cache



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to