ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1411413767
########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -175,13 +176,15 @@ public class KafkaStreams implements AutoCloseable { private final KafkaClientSupplier clientSupplier; protected final TopologyMetadata topologyMetadata; private final QueryableStoreProvider queryableStoreProvider; + private final StandbyUpdateListener delegatingStandbyUpdateListener; GlobalStreamThread globalStreamThread; private KafkaStreams.StateListener stateListener; private StateRestoreListener globalStateRestoreListener; private boolean oldHandler; private BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler; private final Object changeThreadCount = new Object(); + private StandbyUpdateListener globalStandbyListener; Review Comment: I know you're just following the established pattern here, but it always felt weird to me that we have two different top-level fields for the restore listener. I think it makes more sense for the "global" standby listener to just be completely encapsulated by the DelegatingStandbyUpdateListener class, and then make that class static. Imo we should also name it `userStandbyListener` rather than `globalStandbyListener` to avoid confusion with global state stores. I decided to just whip up a quick PR with all these fixes for the existing restore listener, so you can just take a look at https://github.com/apache/kafka/pull/14886 to see exactly what I'm suggesting we do for the standby listener ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -672,6 +678,8 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM } catch (final Exception e) { throw new StreamsException("State restore listener failed on batch restored", e); } + } else if (changelogMetadata.stateManager.taskType() == TaskType.STANDBY) { + standbyUpdateListener.onBatchLoaded(partition, storeName, stateManager.taskId(), currentOffset, numRecords, storeMetadata.endOffset()); Review Comment: We should probably put this in a try block like the restore listener does right above this: ```suggestion try { standbyUpdateListener.onBatchLoaded(partition, storeName, stateManager.taskId(), currentOffset, numRecords, storeMetadata.endOffset()); } catch (final Exception e) { throw new StreamsException("Standby updater listener failed on batch loaded", e); } ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -1012,6 +1017,8 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks, // no records to restore; in this case we just initialize the sensor to zero final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L); task.recordRestoration(time, recordsToRestore, true); + } else if (changelogMetadata.stateManager.taskType() == TaskType.STANDBY) { + standbyUpdateListener.onUpdateStart(partition, storeName, startOffset); Review Comment: Same here, we should wrap in try-catch that mirrors the #onRestoreStart block above: ```suggestion try { standbyUpdateListener.onUpdateStart(partition, storeName, startOffset); } catch (final Exception e) { throw new StreamsException("Standby update listener failed on update start", e); } ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -980,25 +988,22 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks, restoreConsumer.seekToBeginning(newPartitionsWithoutStartOffset); } - // do not trigger restore listener if we are processing standby tasks for (final ChangelogMetadata changelogMetadata : newPartitionsToRestore) { - if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE) { - final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata; - final TopicPartition partition = storeMetadata.changelogPartition(); - final String storeName = storeMetadata.store().name(); - - long startOffset = 0L; - try { - startOffset = restoreConsumer.position(partition); - } catch (final TimeoutException swallow) { - // if we cannot find the starting position at the beginning, just use the default 0L - } catch (final KafkaException e) { - // this also includes InvalidOffsetException, which should not happen under normal - // execution, hence it is also okay to wrap it as fatal StreamsException - throw new StreamsException("Restore consumer get unexpected error trying to get the position " + + final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata; + final TopicPartition partition = storeMetadata.changelogPartition(); + final String storeName = storeMetadata.store().name(); + long startOffset = 0L; + try { + startOffset = restoreConsumer.position(partition); + } catch (final TimeoutException swallow) { + // if we cannot find the starting position at the beginning, just use the default 0L + } catch (final KafkaException e) { + // this also includes InvalidOffsetException, which should not happen under normal + // execution, hence it is also okay to wrap it as fatal StreamsException + throw new StreamsException("Restore consumer get unexpected error trying to get the position " + " of " + partition, e); - } - + } + if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE) { Review Comment: Github won't let me leave a comment on this line, but I just noticed the error message is incorrect for the restore listener's catch block. Can you slip in a fix for this as well by changing the exception string on line 1010 from "on batch restored" to "on restore start"? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -1032,13 +1039,25 @@ public void unregister(final Collection<TopicPartition> revokedChangelogs) { // if the changelog is not in RESTORING, it means // the corresponding onRestoreStart was not called; in this case // we should not call onRestoreSuspended either - if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE && - changelogMetadata.state().equals(ChangelogState.RESTORING)) { - try { - final String storeName = changelogMetadata.storeMetadata.store().name(); - stateRestoreListener.onRestoreSuspended(partition, storeName, changelogMetadata.totalRestored); - } catch (final Exception e) { - throw new StreamsException("State restore listener failed on restore paused", e); + if (changelogMetadata.state().equals(ChangelogState.RESTORING)) { + final String storeName = changelogMetadata.storeMetadata.store().name(); + if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE) { + try { + stateRestoreListener.onRestoreSuspended(partition, storeName, changelogMetadata.totalRestored); + } catch (final Exception e) { + throw new StreamsException("State restore listener failed on restore paused", e); + } + } else if (changelogMetadata.stateManager.taskType() == TaskType.STANDBY) { + final StateStoreMetadata storeMetadata = changelogMetadata.stateManager.storeMetadata(partition); + // endOffset and storeOffset may be unknown at this point + final long storeOffset = storeMetadata.offset() != null ? storeMetadata.offset() : -1; + final long endOffset = storeMetadata.endOffset() != null ? storeMetadata.endOffset() : -1; + StandbyUpdateListener.SuspendReason suspendReason = StandbyUpdateListener.SuspendReason.MIGRATED; + // Unregistering running standby tasks means the task has been promoted to active. + if (changelogMetadata.stateManager.taskState() == Task.State.RUNNING) { + suspendReason = StandbyUpdateListener.SuspendReason.PROMOTED; + } Review Comment: nit: we always prefer using final variables to make the logic as clear as possible ```suggestion // Unregistering running standby tasks means the task has been promoted to active. final StandbyUpdateListener.SuspendReason suspendReason = changelogMetadata.stateManager.taskState() == Task.State.RUNNING ? StandbyUpdateListener.SuspendReason.PROMOTED : StandbyUpdateListener.SuspendReason.MIGRATED; ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + + enum SuspendReason { + MIGRATED, + PROMOTED + } + + /** + * Method called upon the creation of the Standby Task. + * + * @param topicPartition the TopicPartition of the Standby Task. + * @param storeName the name of the store being watched by this Standby Task. + * @param startingOffset the offset from which the Standby Task starts watching. + * @param currentEndOffset the current latest offset on the associated changelog partition. + */ + void onUpdateStart(final TopicPartition topicPartition, + final String storeName, + final long startingOffset, + final long currentEndOffset); + + /** + * Method called after restoring a batch of records. In this case the maximum size of the batch is whatever + * the value of the MAX_POLL_RECORDS is set to. + * Review Comment: Yeah, sorry, I meant there needs to be a literal `<n>` between paragraphs, for when it gets compiled into javadocs. Like this: ```suggestion * <n> ``` You can look at other javadocs for reference (though we do sometimes miss this and end up with mangled javadocs 😅 ) ########## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + + enum SuspendReason { + MIGRATED, + PROMOTED + } + + /** + * Method called upon the creation of the Standby Task. + * + * @param topicPartition the TopicPartition of the Standby Task. + * @param storeName the name of the store being watched by this Standby Task. + * @param startingOffset the offset from which the Standby Task starts watching. + * @param currentEndOffset the current latest offset on the associated changelog partition. + */ + void onUpdateStart(final TopicPartition topicPartition, + final String storeName, + final long startingOffset, + final long currentEndOffset); + + /** + * Method called after restoring a batch of records. In this case the maximum size of the batch is whatever + * the value of the MAX_POLL_RECORDS is set to. + * Review Comment: Yeah, sorry, I meant there needs to be a literal `<n>` between paragraphs, for when it gets compiled into javadocs. Like this: ```suggestion * <n> ``` You can look at other javadocs for reference (though we do sometimes miss this and end up with mangled javadocs 😅 ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org