ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1391848915
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -1012,6 +1022,8 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks, // no records to restore; in this case we just initialize the sensor to zero final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L); task.recordRestoration(time, recordsToRestore, true); + } else if (changelogMetadata.stateManager.taskType() == TaskType.STANDBY && storeMetadata.endOffset() != null) { Review Comment: Why do we only invoke the standby listener when we have the `endOffset` filled out? I don't know off the top of my head when it would be empty or not, but I would think the listener callbacks are still useful even without this one field filled in. Also, users might rely on the standby listener callbacks being invoked. I think we should guarantee that the listener is always called, at least in the absence of app-wide errors that cause a shutdown) If you're wondering what to do in the case of it being null, I would suggest just passing in `-1L` as a sentinel value. As long as we mention that this is a possibility in the javadocs for the `endOffset` argument, I don't see any problem with leaving it up to the user to decide how to react in this case ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Se } private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set<TopicPartition> partitions) { - return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); + final StreamTask streamTask = activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); + final ProcessorStateManager stateManager = standbyTask.stateManager(); + for (final TopicPartition partition : partitions) { + final ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(partition); + if (storeMetadata != null && storeMetadata.endOffset() != null) { + standbyTaskUpdateListener.onUpdateSuspended(partition, storeMetadata.store().name(), storeMetadata.offset(), storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED); Review Comment: Seems like this is the only place where we invoke the `onUpdateSuspended` callback, ie we're missing the `SuspendReason.MIGRATED` case right? We can just look to the active restore listener and follow that same example -- looks like it invokes the analogous `#onRestoreSuspended` callback in the StoreChangelogReader, specifically in the `#unregister` method. Personally, I think that would be the best place to invoke the standby listener's `#onUpdateSuspended`, not just in the `MIGRATED` case but also for `PROMOTED`, so we can keep the logic in one place. And `#unregister` is the perfect place to do so, since it always gets invoked whether the task is being closed or recycled. You can just add a parameter to the `#unregister` method to pass in which of those two options it was. Keeping everything in the StoreChangelogReader also helps us avoid some gnarly questions about special case handling, because the question of when a task is closed can actually be pretty complicated when you look at it within the TaskManager: for example corrupted tasks may be closed and revived, opening up questions about whether and when to invoke `#onUpdateSuspend` if it's going to be revived again. But within the StoreChangelogReader we know that each task should have a 1:1 ratio of calls to `prepareChangelogs` and `unregister`, so it's much easier to reason about -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org