UladzislauBlok commented on code in PR #22417:
URL: https://github.com/apache/kafka/pull/22417#discussion_r3330149215
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -348,7 +357,21 @@ private void reprocessState(final StateStoreMetadata
storeMetadata) {
// TODO with https://issues.apache.org/jira/browse/KAFKA-10315
we can just call
// `poll(pollMS)` without adding the request timeout and
do a more precise
// timeout handling
- final ConsumerRecords<byte[], byte[]> records =
globalConsumer.poll(pollMsPlusRequestTimeout);
+ if (inErrorStateSupplier.getAsBoolean()) {
+ logBootstrapInterrupted(storeMetadata);
+ return;
+ }
Review Comment:
Looks like a lot of code duplication. Can we move it to dedicated method, or
keep it on high level?
AFAIU, when restoration will be completed (on current store) and we'll move
to next one we'll interrupt it anyway. Kinda trade-off to not check same
condition n-times
```java
for (final StateStoreMetadata metadata : storeMetadata.values()) {
if (inErrorStateSupplier.getAsBoolean()) {
log.info("Global store bootstrap interrupted by shutdown before
starting {}", metadata.stateStore.name());
break;
}
...
}
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -299,7 +300,13 @@ public void run() {
if (size != -1L) {
cache.resize(size);
}
- stateConsumer.pollAndUpdate();
+ try {
+ stateConsumer.pollAndUpdate();
+ } catch (final WakeupException e) {
+ if (!inErrorState()) {
+ throw e;
+ }
+ }
Review Comment:
I think this is not part of bootstrapping, isn't it?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -429,8 +437,21 @@ private StateConsumer initialize() {
);
}
+ if (inErrorState()) {
+ closeStateConsumer(stateConsumer, false);
+ return null;
+ }
+
setState(RUNNING);
return stateConsumer;
+ } catch (final WakeupException e) {
+ closeStateConsumer(stateConsumer, false);
+ if (inErrorState()) {
+ log.info("Global thread initialization interrupted by
shutdown");
+ } else {
+ startupException = new StreamsException(
+ "Unexpected wakeup during initialization of
GlobalStreamThread", e);
+ }
Review Comment:
I think this part should be enough:
```java
if (inErrorState()) {
closeStateConsumer(stateConsumer, false);
return null;
}
```
Do we need to catch `WakeupException`?
UPD: Overall idea is to `break` execution of `GlobalStateManagerImpl` and
verify if it was interrupted (check `inErrorState()`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]