ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1391848915


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -1012,6 +1022,8 @@ private void prepareChangelogs(final Map<TaskId, Task> 
tasks,
                 // no records to restore; in this case we just initialize the 
sensor to zero
                 final long recordsToRestore = 
Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
                 task.recordRestoration(time, recordsToRestore, true);
+            }  else if (changelogMetadata.stateManager.taskType() == 
TaskType.STANDBY && storeMetadata.endOffset() != null) {

Review Comment:
   Why do we only invoke the standby listener when we have the `endOffset` 
filled out? I don't know off the top of my head when it would be empty or not, 
but I would think the listener callbacks are still useful even without this one 
field filled in. Also, users might rely on the standby listener callbacks being 
invoked. I think we should guarantee that the listener is always called, at 
least in the absence of app-wide errors that cause a shutdown)
   
   If you're wondering what to do in the case of it being null, I would suggest 
just passing in  `-1L` as a sentinel value. As long as we mention that this is 
a possibility in the javadocs for the `endOffset` argument, I don't see any 
problem with leaving it up to the user to decide how to react in this case



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final 
StreamTask activeTask, final Se
     }
 
     private StreamTask convertStandbyToActive(final StandbyTask standbyTask, 
final Set<TopicPartition> partitions) {
-        return activeTaskCreator.createActiveTaskFromStandby(standbyTask, 
partitions, mainConsumer);
+        final StreamTask streamTask = 
activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, 
mainConsumer);
+        final ProcessorStateManager stateManager = standbyTask.stateManager();
+        for (final TopicPartition partition : partitions) {
+            final ProcessorStateManager.StateStoreMetadata storeMetadata = 
stateManager.storeMetadata(partition);
+            if (storeMetadata != null && storeMetadata.endOffset() != null) {
+                standbyTaskUpdateListener.onUpdateSuspended(partition, 
storeMetadata.store().name(), storeMetadata.offset(), 
storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED);

Review Comment:
   Seems like this is the only place where we invoke the `onUpdateSuspended` 
callback, ie we're missing the `SuspendReason.MIGRATED` case right?
   
   We can just look to the active restore listener and follow that same example 
-- looks like it invokes the analogous `#onRestoreSuspended` callback in the 
StoreChangelogReader, specifically in the `#unregister` method. Personally, I 
think that would be the best place to invoke the standby listener's 
`#onUpdateSuspended`, not just in the `MIGRATED` case but also for `PROMOTED`, 
so we can keep the logic in one place. And `#unregister` is the perfect place 
to do so, since it always gets invoked whether the task is being closed or 
recycled. You can just add a parameter to the `#unregister` method to pass in 
which of those two options it was.
   
   Keeping everything in the StoreChangelogReader also helps us avoid some 
gnarly questions about special case handling, because the question of when a 
task is closed can actually be pretty complicated when you look at it within 
the TaskManager: for example corrupted tasks may be closed and revived, opening 
up questions about whether and when to invoke `#onUpdateSuspend` if it's going 
to be revived again. But within the StoreChangelogReader we know that each task 
should have a 1:1 ratio of calls to `prepareChangelogs` and `unregister`, so 
it's much easier to reason about



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to