guozhangwang commented on a change in pull request #8248: URL: https://github.com/apache/kafka/pull/8248#discussion_r432000417
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ########## @@ -811,17 +812,41 @@ private void prepareChangelogs(final Set<ChangelogMetadata> newPartitionsToResto } } + private RuntimeException invokeOnRestoreEnd(final TopicPartition partition, + final ChangelogMetadata changelogMetadata) { + // only trigger the store's specific listener to make sure we disable bulk loading before transition to standby + final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata; + final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback(); + final String storeName = storeMetadata.store().name(); + if (restoreCallback instanceof StateRestoreListener) { + try { + ((StateRestoreListener) restoreCallback).onRestoreEnd(partition, storeName, changelogMetadata.totalRestored); + } catch (final RuntimeException e) { + return e; + } + } + return null; + } + @Override - public void remove(final Collection<TopicPartition> revokedChangelogs) { - // Only changelogs that are initialized that been added to the restore consumer's assignment + public void unregister(final Collection<TopicPartition> revokedChangelogs, + final boolean triggerOnRestoreEnd) { + final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); + + // Only changelogs that are initialized have been added to the restore consumer's assignment final List<TopicPartition> revokedInitializedChangelogs = new ArrayList<>(); for (final TopicPartition partition : revokedChangelogs) { final ChangelogMetadata changelogMetadata = changelogs.remove(partition); if (changelogMetadata != null) { - if (changelogMetadata.state() != ChangelogState.REGISTERED) { + if (triggerOnRestoreEnd && changelogMetadata.state().equals(ChangelogState.RESTORING)) { Review comment: Yup, I propose to decouple these two and only allow restore callback at the per-store level and restore listener at the global level. We will always open the store with compaction disabled etc when we are transiting to restoring, and after we've done the restoration (for active) we will do a one-off compaction, and then reopen the store with configs overridden. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org