ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1411413767


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -175,13 +176,15 @@ public class KafkaStreams implements AutoCloseable {
     private final KafkaClientSupplier clientSupplier;
     protected final TopologyMetadata topologyMetadata;
     private final QueryableStoreProvider queryableStoreProvider;
+    private final StandbyUpdateListener delegatingStandbyUpdateListener;
 
     GlobalStreamThread globalStreamThread;
     private KafkaStreams.StateListener stateListener;
     private StateRestoreListener globalStateRestoreListener;
     private boolean oldHandler;
     private BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler;
     private final Object changeThreadCount = new Object();
+    private StandbyUpdateListener globalStandbyListener;

Review Comment:
   I know you're just following the established pattern here, but it always 
felt weird to me that we have two different top-level fields for the restore 
listener. I think it makes more sense for the "global" standby listener to just 
be completely encapsulated by the DelegatingStandbyUpdateListener class, and 
then make that class static. 
   
   Imo we should also name it `userStandbyListener` rather than 
`globalStandbyListener` to avoid confusion with global state stores. I decided 
to just whip up a quick PR with all these fixes for the existing restore 
listener, so you can just take a look at 
https://github.com/apache/kafka/pull/14886 to see exactly what I'm suggesting 
we do for the standby listener



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -672,6 +678,8 @@ private int restoreChangelog(final Task task, final 
ChangelogMetadata changelogM
                 } catch (final Exception e) {
                     throw new StreamsException("State restore listener failed 
on batch restored", e);
                 }
+            } else if (changelogMetadata.stateManager.taskType() == 
TaskType.STANDBY) {
+                standbyUpdateListener.onBatchLoaded(partition, storeName, 
stateManager.taskId(), currentOffset, numRecords, storeMetadata.endOffset());

Review Comment:
   We should probably put this in a try block like the restore listener does 
right above this:
   ```suggestion
                   try {
                       standbyUpdateListener.onBatchLoaded(partition, 
storeName, stateManager.taskId(), currentOffset, numRecords, 
storeMetadata.endOffset());
                   } catch (final Exception e) {
                       throw new StreamsException("Standby updater listener 
failed on batch loaded", e);
                   }
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -1012,6 +1017,8 @@ private void prepareChangelogs(final Map<TaskId, Task> 
tasks,
                 // no records to restore; in this case we just initialize the 
sensor to zero
                 final long recordsToRestore = 
Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
                 task.recordRestoration(time, recordsToRestore, true);
+            }  else if (changelogMetadata.stateManager.taskType() == 
TaskType.STANDBY) {
+                standbyUpdateListener.onUpdateStart(partition, storeName, 
startOffset);

Review Comment:
   Same here, we should wrap in try-catch that mirrors the #onRestoreStart 
block above:
   ```suggestion
                   try {
                       standbyUpdateListener.onUpdateStart(partition, 
storeName, startOffset);
                   } catch (final Exception e) {
                       throw new StreamsException("Standby update listener 
failed on update start", e);
                   }
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -980,25 +988,22 @@ private void prepareChangelogs(final Map<TaskId, Task> 
tasks,
             restoreConsumer.seekToBeginning(newPartitionsWithoutStartOffset);
         }
 
-        // do not trigger restore listener if we are processing standby tasks
         for (final ChangelogMetadata changelogMetadata : 
newPartitionsToRestore) {
-            if (changelogMetadata.stateManager.taskType() == 
Task.TaskType.ACTIVE) {
-                final StateStoreMetadata storeMetadata = 
changelogMetadata.storeMetadata;
-                final TopicPartition partition = 
storeMetadata.changelogPartition();
-                final String storeName = storeMetadata.store().name();
-
-                long startOffset = 0L;
-                try {
-                    startOffset = restoreConsumer.position(partition);
-                } catch (final TimeoutException swallow) {
-                    // if we cannot find the starting position at the 
beginning, just use the default 0L
-                } catch (final KafkaException e) {
-                    // this also includes InvalidOffsetException, which should 
not happen under normal
-                    // execution, hence it is also okay to wrap it as fatal 
StreamsException
-                    throw new StreamsException("Restore consumer get 
unexpected error trying to get the position " +
+            final StateStoreMetadata storeMetadata = 
changelogMetadata.storeMetadata;
+            final TopicPartition partition = 
storeMetadata.changelogPartition();
+            final String storeName = storeMetadata.store().name();
+            long startOffset = 0L;
+            try {
+                startOffset = restoreConsumer.position(partition);
+            } catch (final TimeoutException swallow) {
+                // if we cannot find the starting position at the beginning, 
just use the default 0L
+            } catch (final KafkaException e) {
+                // this also includes InvalidOffsetException, which should not 
happen under normal
+                // execution, hence it is also okay to wrap it as fatal 
StreamsException
+                throw new StreamsException("Restore consumer get unexpected 
error trying to get the position " +
                         " of " + partition, e);
-                }
-
+            }
+            if (changelogMetadata.stateManager.taskType() == 
Task.TaskType.ACTIVE) {

Review Comment:
   Github won't let me leave a comment on this line, but I just noticed the 
error message is incorrect for the restore listener's catch block. Can you slip 
in a fix for this as well by changing the exception string on line 1010 from 
"on batch restored" to "on restore start"?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -1032,13 +1039,25 @@ public void unregister(final Collection<TopicPartition> 
revokedChangelogs) {
                     // if the changelog is not in RESTORING, it means
                     // the corresponding onRestoreStart was not called; in 
this case
                     // we should not call onRestoreSuspended either
-                    if (changelogMetadata.stateManager.taskType() == 
Task.TaskType.ACTIVE &&
-                        
changelogMetadata.state().equals(ChangelogState.RESTORING)) {
-                        try {
-                            final String storeName = 
changelogMetadata.storeMetadata.store().name();
-                            stateRestoreListener.onRestoreSuspended(partition, 
storeName, changelogMetadata.totalRestored);
-                        } catch (final Exception e) {
-                            throw new StreamsException("State restore listener 
failed on restore paused", e);
+                    if 
(changelogMetadata.state().equals(ChangelogState.RESTORING)) {
+                        final String storeName = 
changelogMetadata.storeMetadata.store().name();
+                        if (changelogMetadata.stateManager.taskType() == 
Task.TaskType.ACTIVE) {
+                            try {
+                                
stateRestoreListener.onRestoreSuspended(partition, storeName, 
changelogMetadata.totalRestored);
+                            } catch (final Exception e) {
+                                throw new StreamsException("State restore 
listener failed on restore paused", e);
+                            }
+                        } else if (changelogMetadata.stateManager.taskType() 
== TaskType.STANDBY) {
+                            final StateStoreMetadata storeMetadata = 
changelogMetadata.stateManager.storeMetadata(partition);
+                            // endOffset and storeOffset may be unknown at 
this point
+                            final long storeOffset = storeMetadata.offset() != 
null ? storeMetadata.offset() : -1;
+                            final long endOffset = storeMetadata.endOffset() 
!= null ? storeMetadata.endOffset() : -1;
+                            StandbyUpdateListener.SuspendReason suspendReason 
= StandbyUpdateListener.SuspendReason.MIGRATED;
+                            // Unregistering running standby tasks means the 
task has been promoted to active.
+                            if (changelogMetadata.stateManager.taskState() == 
Task.State.RUNNING) {
+                                suspendReason = 
StandbyUpdateListener.SuspendReason.PROMOTED;
+                            }

Review Comment:
   nit: we always prefer using final variables to make the logic as clear as 
possible
   ```suggestion
                               // Unregistering running standby tasks means the 
task has been promoted to active.
                               final StandbyUpdateListener.SuspendReason 
suspendReason = 
                                   changelogMetadata.stateManager.taskState() 
== Task.State.RUNNING 
                                       ? 
StandbyUpdateListener.SuspendReason.PROMOTED
                                       : 
StandbyUpdateListener.SuspendReason.MIGRATED;                        
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+    enum SuspendReason {
+        MIGRATED,
+        PROMOTED
+    }
+
+    /**
+     * Method called upon the creation of the Standby Task.
+     *
+     * @param topicPartition   the TopicPartition of the Standby Task.
+     * @param storeName        the name of the store being watched by this 
Standby Task.
+     * @param startingOffset   the offset from which the Standby Task starts 
watching.
+     * @param currentEndOffset the current latest offset on the associated 
changelog partition.
+     */
+    void onUpdateStart(final TopicPartition topicPartition,
+                       final String storeName,
+                       final long startingOffset,
+                       final long currentEndOffset);
+
+    /**
+     * Method called after restoring a batch of records.  In this case the 
maximum size of the batch is whatever
+     * the value of the MAX_POLL_RECORDS is set to.
+     *

Review Comment:
   Yeah, sorry, I meant there needs to be a literal `<n>` between paragraphs, 
for when it gets compiled into javadocs. Like this:
   ```suggestion
        * <n>
   ```
   You can look at other javadocs for reference (though we do sometimes miss 
this and end up with mangled javadocs 😅 )
   



##########
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+    enum SuspendReason {
+        MIGRATED,
+        PROMOTED
+    }
+
+    /**
+     * Method called upon the creation of the Standby Task.
+     *
+     * @param topicPartition   the TopicPartition of the Standby Task.
+     * @param storeName        the name of the store being watched by this 
Standby Task.
+     * @param startingOffset   the offset from which the Standby Task starts 
watching.
+     * @param currentEndOffset the current latest offset on the associated 
changelog partition.
+     */
+    void onUpdateStart(final TopicPartition topicPartition,
+                       final String storeName,
+                       final long startingOffset,
+                       final long currentEndOffset);
+
+    /**
+     * Method called after restoring a batch of records.  In this case the 
maximum size of the batch is whatever
+     * the value of the MAX_POLL_RECORDS is set to.
+     *

Review Comment:
   Yeah, sorry, I meant there needs to be a literal `<n>` between paragraphs, 
for when it gets compiled into javadocs. Like this:
   ```suggestion
        * <n>
   ```
   You can look at other javadocs for reference (though we do sometimes miss 
this and end up with mangled javadocs 😅 )
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to