AMashenkov commented on code in PR #3221:
URL: https://github.com/apache/ignite-3/pull/3221#discussion_r1491308787


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java:
##########
@@ -437,21 +442,57 @@ class OnUpdateHandlerImpl implements OnUpdateHandler {
         @Override
         public CompletableFuture<Void> handle(UpdateLogEvent event, 
HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) {
             if (event instanceof SnapshotEntry) {
-                return handle((SnapshotEntry) event);
+                return handle((SnapshotEntry) event, causalityToken);
             }
 
             return handle((VersionedUpdate) event, metaStorageUpdateTimestamp, 
causalityToken);
         }
 
-        private CompletableFuture<Void> handle(SnapshotEntry event) {
+        private CompletableFuture<Void> handle(SnapshotEntry event, long 
causalityToken) {
             Catalog catalog = event.snapshot();
 
+            // Use reverse order to find latest descriptors.
+            Collection<Catalog> droppedCatalogVersions = 
catalogByVer.headMap(catalog.version(), false).descendingMap().values();
+
+            // Collect destroy events for dropped tables/indexes.
+            IntSet droppedObjects = new IntOpenHashSet();
+            List<Fireable> events = new ArrayList<>();
+
+            droppedCatalogVersions.forEach(oldCatalog -> 
oldCatalog.indexes().stream()
+                    .filter(idx -> catalog.index(idx.id()) == null)
+                    .filter(idx -> droppedObjects.add(idx.id()))
+                    .forEach(idx -> events.add(
+                            new DestroyIndexEvent(idx.id(), idx.tableId(), 
tableZoneDescriptor(oldCatalog, idx.tableId()).partitions()))
+                    ));
+
+            droppedObjects.clear();
+            droppedCatalogVersions.forEach(oldCatalog -> 
oldCatalog.tables().stream()
+                    .filter(tbl -> catalog.table(tbl.id()) == null)
+                    .filter(tbl -> droppedObjects.add(tbl.id()))
+                    .forEach(tbl -> events.add(new DestroyTableEvent(tbl.id(), 
tableZoneDescriptor(oldCatalog, tbl.id()).partitions()))));
+
             // On recovery phase, we must register catalog from the snapshot.
             // In other cases, it is ok to rewrite an existed version, because 
it's exactly the same.
             registerCatalog(catalog);
-            truncateUpTo(catalog);
 
-            return nullCompletedFuture();
+            List<CompletableFuture<?>> eventFutures = new 
ArrayList<>(events.size());
+
+            for (Fireable fireEvent : events) {
+                eventFutures.add(fireEvent(
+                        fireEvent.eventType(),
+                        fireEvent.createEventParameters(causalityToken, 
catalog.version())
+                ));
+            }
+
+            return allOf(eventFutures.toArray(CompletableFuture[]::new))
+                    .whenComplete((ignore, err) -> {
+                        if (err != null) {
+                            LOG.warn("Failed to compact catalog.", err);
+                            //TODO: IGNITE-14611 Pass exception to an error 
handler?
+                        } else {
+                            truncateUpTo(catalog);

Review Comment:
   I'm not sure if we can truncate catalog right after events are fired, and be 
sure all components retrieved all the required info from catalog during 
synchronous `notify` call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to