lucliu1108 commented on code in PR #22417:
URL: https://github.com/apache/kafka/pull/22417#discussion_r3350628260
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -173,66 +178,78 @@ public void setGlobalProcessorContext(final
InternalProcessorContext<?, ?> globa
}
@Override
- public Set<String> initialize() {
+ public Optional<Set<String>> initialize() {
droppedRecordsSensor = droppedRecordsSensor(
Thread.currentThread().getName(),
globalProcessorContext.taskId().toString(),
globalProcessorContext.metrics()
);
- final Map<TopicPartition, StateStore> wrappedStores = new HashMap<>();
- for (final StateStore stateStore : topology.globalStateStores()) {
- final List<TopicPartition> storePartitions =
topicPartitionsForStore(stateStore);
- final StateStore maybeWrappedStore =
LegacyCheckpointingStateStore.maybeWrapStore(
- stateStore, eosEnabled, new HashSet<>(storePartitions),
stateDirectory, null, logPrefix);
- try {
- maybeWrappedStore.init(globalProcessorContext,
maybeWrappedStore);
- } catch (final ProcessorStateException e) {
- if (eosEnabled) {
- log.warn("{}Detected unclean shutdown for global store {}.
" +
- "Wiping global state directory.", logPrefix,
stateStore.name(), e);
- try {
-
Utils.delete(stateDirectory.globalStateDir().getAbsoluteFile());
- } catch (final IOException ioe) {
- e.addSuppressed(ioe);
+ try {
+ final Map<TopicPartition, StateStore> wrappedStores = new
HashMap<>();
+ for (final StateStore stateStore : topology.globalStateStores()) {
+ final List<TopicPartition> storePartitions =
topicPartitionsForStore(stateStore);
+ final StateStore maybeWrappedStore =
LegacyCheckpointingStateStore.maybeWrapStore(
+ stateStore, eosEnabled, new
HashSet<>(storePartitions), stateDirectory, null, logPrefix);
+ try {
+ maybeWrappedStore.init(globalProcessorContext,
maybeWrappedStore);
+ } catch (final ProcessorStateException e) {
+ if (eosEnabled) {
+ log.warn("{}Detected unclean shutdown for global store
{}. " +
+ "Wiping global state directory.", logPrefix,
stateStore.name(), e);
+ try {
+
Utils.delete(stateDirectory.globalStateDir().getAbsoluteFile());
+ } catch (final IOException ioe) {
+ e.addSuppressed(ioe);
+ }
}
+ throw e;
}
- throw e;
- }
- for (final TopicPartition storePartition : storePartitions) {
- wrappedStores.put(storePartition, maybeWrappedStore);
+ for (final TopicPartition storePartition : storePartitions) {
+ wrappedStores.put(storePartition, maybeWrappedStore);
+ }
}
- }
- // migrate offsets from legacy checkpoint file into the stores
- LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix,
stateDirectory, null, wrappedStores);
-
- for (final StateStoreMetadata metadata : storeMetadata.values()) {
- // load the committed offsets from the store
- final StateStore store = metadata.stateStore;
- if (store.persistent()) {
- for (final TopicPartition partition :
metadata.changelogPartitions) {
- final Long offset = store.committedOffset(partition);
- if (offset != null) {
- currentOffsets.put(partition, offset);
+ // migrate offsets from legacy checkpoint file into the stores
+ LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix,
stateDirectory, null, wrappedStores);
+
+ for (final StateStoreMetadata metadata : storeMetadata.values()) {
+ if (shouldStopBootstrappingSupplier.getAsBoolean()) {
Review Comment:
Yes. First path is supplier check between stores, the other one is
wakeupException thrown by methods like `poll`/`partitionsFor`/...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]