UladzislauBlok commented on code in PR #22417:
URL: https://github.com/apache/kafka/pull/22417#discussion_r3350170134
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -173,66 +178,78 @@ public void setGlobalProcessorContext(final
InternalProcessorContext<?, ?> globa
}
@Override
- public Set<String> initialize() {
+ public Optional<Set<String>> initialize() {
droppedRecordsSensor = droppedRecordsSensor(
Thread.currentThread().getName(),
globalProcessorContext.taskId().toString(),
globalProcessorContext.metrics()
);
- final Map<TopicPartition, StateStore> wrappedStores = new HashMap<>();
- for (final StateStore stateStore : topology.globalStateStores()) {
- final List<TopicPartition> storePartitions =
topicPartitionsForStore(stateStore);
- final StateStore maybeWrappedStore =
LegacyCheckpointingStateStore.maybeWrapStore(
- stateStore, eosEnabled, new HashSet<>(storePartitions),
stateDirectory, null, logPrefix);
- try {
- maybeWrappedStore.init(globalProcessorContext,
maybeWrappedStore);
- } catch (final ProcessorStateException e) {
- if (eosEnabled) {
- log.warn("{}Detected unclean shutdown for global store {}.
" +
- "Wiping global state directory.", logPrefix,
stateStore.name(), e);
- try {
-
Utils.delete(stateDirectory.globalStateDir().getAbsoluteFile());
- } catch (final IOException ioe) {
- e.addSuppressed(ioe);
+ try {
+ final Map<TopicPartition, StateStore> wrappedStores = new
HashMap<>();
+ for (final StateStore stateStore : topology.globalStateStores()) {
+ final List<TopicPartition> storePartitions =
topicPartitionsForStore(stateStore);
+ final StateStore maybeWrappedStore =
LegacyCheckpointingStateStore.maybeWrapStore(
+ stateStore, eosEnabled, new
HashSet<>(storePartitions), stateDirectory, null, logPrefix);
+ try {
+ maybeWrappedStore.init(globalProcessorContext,
maybeWrappedStore);
+ } catch (final ProcessorStateException e) {
+ if (eosEnabled) {
+ log.warn("{}Detected unclean shutdown for global store
{}. " +
+ "Wiping global state directory.", logPrefix,
stateStore.name(), e);
+ try {
+
Utils.delete(stateDirectory.globalStateDir().getAbsoluteFile());
+ } catch (final IOException ioe) {
+ e.addSuppressed(ioe);
+ }
}
+ throw e;
}
- throw e;
- }
- for (final TopicPartition storePartition : storePartitions) {
- wrappedStores.put(storePartition, maybeWrappedStore);
+ for (final TopicPartition storePartition : storePartitions) {
+ wrappedStores.put(storePartition, maybeWrappedStore);
+ }
}
- }
- // migrate offsets from legacy checkpoint file into the stores
- LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix,
stateDirectory, null, wrappedStores);
-
- for (final StateStoreMetadata metadata : storeMetadata.values()) {
- // load the committed offsets from the store
- final StateStore store = metadata.stateStore;
- if (store.persistent()) {
- for (final TopicPartition partition :
metadata.changelogPartitions) {
- final Long offset = store.committedOffset(partition);
- if (offset != null) {
- currentOffsets.put(partition, offset);
+ // migrate offsets from legacy checkpoint file into the stores
+ LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix,
stateDirectory, null, wrappedStores);
+
+ for (final StateStoreMetadata metadata : storeMetadata.values()) {
+ if (shouldStopBootstrappingSupplier.getAsBoolean()) {
Review Comment:
Just to confirm: so we ether catch the "interrupted bootstrapping" fact on
next iteration or when handling `WakeupException`. That makes sense
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -429,6 +430,11 @@ private StateConsumer initialize() {
);
}
+ if (inErrorState()) {
+ closeStateConsumer(stateConsumer, false);
+ return null;
+ }
Review Comment:
for which scenario we need this check?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java:
##########
@@ -243,6 +244,60 @@ public void shouldTransitionToRunningOnStart() throws
Exception {
globalStreamThread.shutdown();
}
+ @Test
+ @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
+ public void shouldShutdownDuringBootstrap() throws Exception {
+ initializeConsumer();
+ mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition,
1_000_000L));
+
+ final Thread shutdownThread = new Thread(() -> {
+ try {
+ TestUtils.waitForCondition(
+ () ->
stateRestoreListener.storeNameCalledStates.containsKey(MockStateRestoreListener.RESTORE_START),
+ 10 * 1000L,
+ "Bootstrap restore never started.");
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ globalStreamThread.shutdown();
+ });
+ shutdownThread.start();
+
+ startAndSwallowError();
+ shutdownThread.join();
+ globalStreamThread.join(5_000);
Review Comment:
there is already timeout on test, so we can remove this one I guess
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java:
##########
@@ -243,6 +244,60 @@ public void shouldTransitionToRunningOnStart() throws
Exception {
globalStreamThread.shutdown();
}
+ @Test
+ @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
+ public void shouldShutdownDuringBootstrap() throws Exception {
+ initializeConsumer();
+ mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition,
1_000_000L));
+
+ final Thread shutdownThread = new Thread(() -> {
+ try {
+ TestUtils.waitForCondition(
+ () ->
stateRestoreListener.storeNameCalledStates.containsKey(MockStateRestoreListener.RESTORE_START),
+ 10 * 1000L,
+ "Bootstrap restore never started.");
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ globalStreamThread.shutdown();
+ });
+ shutdownThread.start();
Review Comment:
Runnable + Executors is better imo, but this is super minor. You can ignore
this comment if you like current approach more
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]