chickenchickenlove commented on code in PR #22417:
URL: https://github.com/apache/kafka/pull/22417#discussion_r3328152017
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -493,7 +516,20 @@ private void restoreState(final StateStoreMetadata
storeMetadata) {
// TODO with https://issues.apache.org/jira/browse/KAFKA-10315
we can just call
// `poll(pollMS)` without adding the request timeout and
do a more precise
// timeout handling
- final ConsumerRecords<byte[], byte[]> records =
globalConsumer.poll(pollMsPlusRequestTimeout);
+ if (inErrorStateSupplier.getAsBoolean()) {
+ logBootstrapInterrupted(storeMetadata);
+ return;
Review Comment:
Could we make the shutdown-interrupted bootstrap path explicit instead of
returning normally from `GlobalStateManagerImpl`?
Currently, when `inErrorStateSupplier.getAsBoolean()` is true,
`restoreState()` / `reprocessState()` just return, so
`GlobalStateManagerImpl#initialize()` can also return as if bootstrap completed
successfully. As a result, `GlobalStateUpdateTask#initialize()` may continue
into `initTopology()`, `processorContext.initialize()`, and `flushState()` even
though shutdown has already been requested.
Since `initTopology()` can invoke user-provided `Processor#init()`, this
could unnecessarily open external resources during shutdown. Maybe this should
use an explicit internal signal, such as a dedicated bootstrap-interrupted
exception caught only on the clean shutdown path, or return an initialize
status like completed/interrupted so the follow-up initialization can be
skipped.
What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]