ableegoldman commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1391818555
########## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + + enum SuspendReason { + MIGRATED, + PROMOTED + } + + /** + * Method called upon the creation of the Standby Task. Review Comment: ```suggestion * Method called upon the initialization of the standby task, just before it begins to load from the changelog. ``` nit: I won't comment this everywhere, and frankly it's not a big deal and I'm only pointing it out because it should be a quick fix with find&replace, but we usually don't capitalize terms like "Standby Task". Mainly to help differentiate between the common name and the class name. So for example "TopicPartition" is fine, but "Topic Partition" would be weird. Similarly, I would use "standby task" here, although "StandbyTask" is also correct ########## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + + enum SuspendReason { + MIGRATED, + PROMOTED + } + + /** + * Method called upon the creation of the Standby Task. + * + * @param topicPartition the TopicPartition of the Standby Task. + * @param storeName the name of the store being watched by this Standby Task. + * @param startingOffset the offset from which the Standby Task starts watching. + * @param currentEndOffset the current latest offset on the associated changelog partition. Review Comment: ```suggestion * @param currentEndOffset the current end offset on the associated changelog partition. ``` I guess you could also say "highest offset", but "latest" feels a bit ambiguous, at least to me personally ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -742,6 +756,30 @@ public void onRestoreSuspended(final TopicPartition topicPartition, final String } } + final class DelegatingStandbyUpdateListener implements StandbyUpdateListener { + + @Override + public void onUpdateStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long currentEndOffset) { Review Comment: nit: these method signatures are all really long, can you break up the parameters? This is the format we use: ```suggestion public void onUpdateStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long currentEndOffset) { ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -70,6 +71,7 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName; +@SuppressWarnings("ClassFanOutComplexity") Review Comment: eh, I wouldn't worry too much about it...I mean yes, the TaskManager class is too complex, but that's not on you to solve. Tbh I'm surprised this is what pushed it over the edge ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -982,23 +994,21 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks, // do not trigger restore listener if we are processing standby tasks Review Comment: this comment can be removed now ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -1012,6 +1022,8 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks, // no records to restore; in this case we just initialize the sensor to zero final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L); task.recordRestoration(time, recordsToRestore, true); + } else if (changelogMetadata.stateManager.taskType() == TaskType.STANDBY && storeMetadata.endOffset() != null) { + standbyUpdateListener.onUpdateStart(partition, storeName, startOffset, storeMetadata.endOffset()); Review Comment: I notice we wrap the restore listener in a try-catch block, let's do the same for the standby listener. Maybe also log an error saying which listener threw the error inside the `catch` block in both cases? ########## streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.TopicPartition; + +public interface StandbyUpdateListener { + + enum SuspendReason { + MIGRATED, + PROMOTED + } + + /** + * Method called upon the creation of the Standby Task. + * + * @param topicPartition the TopicPartition of the Standby Task. + * @param storeName the name of the store being watched by this Standby Task. + * @param startingOffset the offset from which the Standby Task starts watching. + * @param currentEndOffset the current latest offset on the associated changelog partition. + */ + void onUpdateStart(final TopicPartition topicPartition, + final String storeName, + final long startingOffset, + final long currentEndOffset); + + /** + * Method called after restoring a batch of records. In this case the maximum size of the batch is whatever + * the value of the MAX_POLL_RECORDS is set to. + * Review Comment: formatting: add a single <n> between paragraphs in the javadocs (although you don't need a closing </n> tag, nor do you need a <n> between the last paragraph and the `@param` section) ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -645,7 +650,12 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM if (numRecords != 0) { final List<ConsumerRecord<byte[], byte[]>> records = changelogMetadata.bufferedRecords.subList(0, numRecords); - stateManager.restore(storeMetadata, records); + final OptionalLong optionalLag = restoreConsumer.currentLag(partition); + Long currentLag = null; Review Comment: just a nit, feel free to ignore, but we can simplify this a bit by just passing the `OptionalLong` directly into `#restore`. I know you're not "supposed" to use Optional as an argument but in this case it lets us do all the special-case handling in one place, and makes it clear that the currentLag may be empty/uninitialized when we go to use it in `#restore` ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -580,6 +583,17 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState } } + public void setStandbyUpdateListener(final StandbyUpdateListener globalStandbyListener) { Review Comment: Can you add javadocs (see above API for reference) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org