ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1391818555


##########
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+    enum SuspendReason {
+        MIGRATED,
+        PROMOTED
+    }
+
+    /**
+     * Method called upon the creation of the Standby Task.

Review Comment:
   ```suggestion
        * Method called upon the initialization of the standby task, just 
before it begins to load from the changelog.
   ```
   nit: I won't comment this everywhere, and frankly it's not a big deal and 
I'm only pointing it out because it should be a quick fix with find&replace, 
but we usually don't capitalize terms like "Standby Task". Mainly to help 
differentiate between the common name and the class name. So for example 
"TopicPartition" is fine, but "Topic Partition" would be weird. Similarly, I 
would use "standby task" here, although "StandbyTask" is also correct 



##########
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+    enum SuspendReason {
+        MIGRATED,
+        PROMOTED
+    }
+
+    /**
+     * Method called upon the creation of the Standby Task.
+     *
+     * @param topicPartition   the TopicPartition of the Standby Task.
+     * @param storeName        the name of the store being watched by this 
Standby Task.
+     * @param startingOffset   the offset from which the Standby Task starts 
watching.
+     * @param currentEndOffset the current latest offset on the associated 
changelog partition.

Review Comment:
   ```suggestion
        * @param currentEndOffset the current end offset on the associated 
changelog partition.
   ```
   I guess you could also say "highest offset", but "latest" feels a bit 
ambiguous, at least to me personally



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -742,6 +756,30 @@ public void onRestoreSuspended(final TopicPartition 
topicPartition, final String
         }
     }
 
+    final class DelegatingStandbyUpdateListener implements 
StandbyUpdateListener {
+
+        @Override
+        public void onUpdateStart(final TopicPartition topicPartition, final 
String storeName, final long startingOffset, final long currentEndOffset) {

Review Comment:
   nit: these method signatures are all really long, can you break up the 
parameters? This is the format we use:
   ```suggestion
   public void onUpdateStart(final TopicPartition topicPartition, 
                             final String storeName, 
                             final long startingOffset, 
                             final long currentEndOffset) {
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -70,6 +71,7 @@
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
 
+@SuppressWarnings("ClassFanOutComplexity")

Review Comment:
   eh, I wouldn't worry too much about it...I mean yes, the TaskManager class 
is too complex, but that's not on you to solve. Tbh I'm surprised this is what 
pushed it over the edge



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -982,23 +994,21 @@ private void prepareChangelogs(final Map<TaskId, Task> 
tasks,
 
         // do not trigger restore listener if we are processing standby tasks

Review Comment:
   this comment can be removed now



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -1012,6 +1022,8 @@ private void prepareChangelogs(final Map<TaskId, Task> 
tasks,
                 // no records to restore; in this case we just initialize the 
sensor to zero
                 final long recordsToRestore = 
Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
                 task.recordRestoration(time, recordsToRestore, true);
+            }  else if (changelogMetadata.stateManager.taskType() == 
TaskType.STANDBY && storeMetadata.endOffset() != null) {
+                standbyUpdateListener.onUpdateStart(partition, storeName, 
startOffset, storeMetadata.endOffset());

Review Comment:
   I notice we wrap the restore listener in a try-catch block, let's do the 
same for the standby listener. Maybe also log an error saying which listener 
threw the error inside the `catch` block in both cases?



##########
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+    enum SuspendReason {
+        MIGRATED,
+        PROMOTED
+    }
+
+    /**
+     * Method called upon the creation of the Standby Task.
+     *
+     * @param topicPartition   the TopicPartition of the Standby Task.
+     * @param storeName        the name of the store being watched by this 
Standby Task.
+     * @param startingOffset   the offset from which the Standby Task starts 
watching.
+     * @param currentEndOffset the current latest offset on the associated 
changelog partition.
+     */
+    void onUpdateStart(final TopicPartition topicPartition,
+                       final String storeName,
+                       final long startingOffset,
+                       final long currentEndOffset);
+
+    /**
+     * Method called after restoring a batch of records.  In this case the 
maximum size of the batch is whatever
+     * the value of the MAX_POLL_RECORDS is set to.
+     *

Review Comment:
   formatting: add a single <n> between paragraphs in the javadocs (although 
you don't need a closing </n> tag, nor do you need a <n> between the last 
paragraph and the `@param` section)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -645,7 +650,12 @@ private int restoreChangelog(final Task task, final 
ChangelogMetadata changelogM
 
         if (numRecords != 0) {
             final List<ConsumerRecord<byte[], byte[]>> records = 
changelogMetadata.bufferedRecords.subList(0, numRecords);
-            stateManager.restore(storeMetadata, records);
+            final OptionalLong optionalLag = 
restoreConsumer.currentLag(partition);
+            Long currentLag = null;

Review Comment:
   just a nit, feel free to ignore, but we can simplify this a bit by just 
passing the `OptionalLong` directly into `#restore`. I know you're not 
"supposed" to use Optional as an argument but in this case it lets us do all 
the special-case handling in one place, and makes it clear that the currentLag 
may be empty/uninitialized when we go to use it in `#restore`



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -580,6 +583,17 @@ public void setGlobalStateRestoreListener(final 
StateRestoreListener globalState
         }
     }
 
+    public void setStandbyUpdateListener(final StandbyUpdateListener 
globalStandbyListener) {

Review Comment:
   Can you add javadocs (see above API for reference)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to