AMashenkov commented on code in PR #3221:
URL: https://github.com/apache/ignite-3/pull/3221#discussion_r1491308787
##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java:
##########
@@ -437,21 +442,57 @@ class OnUpdateHandlerImpl implements OnUpdateHandler {
@Override
public CompletableFuture<Void> handle(UpdateLogEvent event,
HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) {
if (event instanceof SnapshotEntry) {
- return handle((SnapshotEntry) event);
+ return handle((SnapshotEntry) event, causalityToken);
}
return handle((VersionedUpdate) event, metaStorageUpdateTimestamp,
causalityToken);
}
- private CompletableFuture<Void> handle(SnapshotEntry event) {
+ private CompletableFuture<Void> handle(SnapshotEntry event, long
causalityToken) {
Catalog catalog = event.snapshot();
+ // Use reverse order to find latest descriptors.
+ Collection<Catalog> droppedCatalogVersions =
catalogByVer.headMap(catalog.version(), false).descendingMap().values();
+
+ // Collect destroy events for dropped tables/indexes.
+ IntSet droppedObjects = new IntOpenHashSet();
+ List<Fireable> events = new ArrayList<>();
+
+ droppedCatalogVersions.forEach(oldCatalog ->
oldCatalog.indexes().stream()
+ .filter(idx -> catalog.index(idx.id()) == null)
+ .filter(idx -> droppedObjects.add(idx.id()))
+ .forEach(idx -> events.add(
+ new DestroyIndexEvent(idx.id(), idx.tableId(),
tableZoneDescriptor(oldCatalog, idx.tableId()).partitions()))
+ ));
+
+ droppedObjects.clear();
+ droppedCatalogVersions.forEach(oldCatalog ->
oldCatalog.tables().stream()
+ .filter(tbl -> catalog.table(tbl.id()) == null)
+ .filter(tbl -> droppedObjects.add(tbl.id()))
+ .forEach(tbl -> events.add(new DestroyTableEvent(tbl.id(),
tableZoneDescriptor(oldCatalog, tbl.id()).partitions()))));
+
// On recovery phase, we must register catalog from the snapshot.
// In other cases, it is ok to rewrite an existed version, because
it's exactly the same.
registerCatalog(catalog);
- truncateUpTo(catalog);
- return nullCompletedFuture();
+ List<CompletableFuture<?>> eventFutures = new
ArrayList<>(events.size());
+
+ for (Fireable fireEvent : events) {
+ eventFutures.add(fireEvent(
+ fireEvent.eventType(),
+ fireEvent.createEventParameters(causalityToken,
catalog.version())
+ ));
+ }
+
+ return allOf(eventFutures.toArray(CompletableFuture[]::new))
+ .whenComplete((ignore, err) -> {
+ if (err != null) {
+ LOG.warn("Failed to compact catalog.", err);
+ //TODO: IGNITE-14611 Pass exception to an error
handler?
+ } else {
+ truncateUpTo(catalog);
Review Comment:
I'm not sure if we can truncate catalog right after events are fired, and be
sure all components retrieved all the required info from catalog during
synchronous `notify` call.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]