ableegoldman commented on code in PR #14735:
URL: https://github.com/apache/kafka/pull/14735#discussion_r1412648726


##########
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+    enum SuspendReason {
+        MIGRATED,
+        PROMOTED
+    }
+
+    /**
+     * Method called upon the creation of the Standby Task.
+     *
+     * @param topicPartition   the TopicPartition of the Standby Task.
+     * @param storeName        the name of the store being watched by this 
Standby Task.
+     * @param startingOffset   the offset from which the Standby Task starts 
watching.
+     */
+    void onUpdateStart(final TopicPartition topicPartition,
+                       final String storeName,
+                       final long startingOffset);
+
+    /**
+     * Method called after restoring a batch of records. In this case the 
maximum size of the batch is whatever

Review Comment:
   ```suggestion
        * Method called after loading a batch of records. In this case the 
maximum size of the batch is whatever
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+    enum SuspendReason {
+        MIGRATED,
+        PROMOTED
+    }
+
+    /**
+     * Method called upon the creation of the Standby Task.
+     *
+     * @param topicPartition   the TopicPartition of the Standby Task.
+     * @param storeName        the name of the store being watched by this 
Standby Task.

Review Comment:
   ditto here, copy this to the other two callbacks as well
   ```suggestion
        * @param storeName the name of the store being loaded
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+    enum SuspendReason {
+        MIGRATED,
+        PROMOTED
+    }
+
+    /**
+     * Method called upon the creation of the Standby Task.
+     *
+     * @param topicPartition   the TopicPartition of the Standby Task.
+     * @param storeName        the name of the store being watched by this 
Standby Task.
+     * @param startingOffset   the offset from which the Standby Task starts 
watching.
+     */
+    void onUpdateStart(final TopicPartition topicPartition,
+                       final String storeName,
+                       final long startingOffset);
+
+    /**
+     * Method called after restoring a batch of records. In this case the 
maximum size of the batch is whatever
+     * the value of the MAX_POLL_RECORDS is set to.
+     * <n>
+     * This method is called after restoring each batch and it is advised to 
keep processing to a minimum.
+     * Any heavy processing will hold up recovering the next batch, hence 
slowing down the restore process as a
+     * whole.
+     *
+     * If you need to do any extended processing or connecting to an external 
service consider doing so asynchronously.

Review Comment:
   ```suggestion
        * This method is called after loading each batch and it is advised to 
keep processing to a minimum.
        * Any heavy processing will block the state updater thread and slow 
down the rate of standby task 
        * loading. Therefore, if you need to do any extended processing or 
connect to an external service,
        * consider doing so asynchronously.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+    enum SuspendReason {
+        MIGRATED,
+        PROMOTED
+    }
+
+    /**
+     * Method called upon the creation of the Standby Task.
+     *
+     * @param topicPartition   the TopicPartition of the Standby Task.

Review Comment:
   Use this (or something like it) for all three callbacks:
   ```suggestion
        * @param topicPartition the changelog TopicPartition for this standby 
task
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+    enum SuspendReason {
+        MIGRATED,
+        PROMOTED
+    }
+
+    /**
+     * Method called upon the creation of the Standby Task.
+     *
+     * @param topicPartition   the TopicPartition of the Standby Task.
+     * @param storeName        the name of the store being watched by this 
Standby Task.
+     * @param startingOffset   the offset from which the Standby Task starts 
watching.

Review Comment:
   ```suggestion
        * @param startingOffset   the offset from which the standby task begins 
consuming from the changelog
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+    enum SuspendReason {
+        MIGRATED,
+        PROMOTED
+    }
+
+    /**
+     * Method called upon the creation of the Standby Task.
+     *
+     * @param topicPartition   the TopicPartition of the Standby Task.
+     * @param storeName        the name of the store being watched by this 
Standby Task.
+     * @param startingOffset   the offset from which the Standby Task starts 
watching.
+     */
+    void onUpdateStart(final TopicPartition topicPartition,
+                       final String storeName,
+                       final long startingOffset);
+
+    /**
+     * Method called after restoring a batch of records. In this case the 
maximum size of the batch is whatever
+     * the value of the MAX_POLL_RECORDS is set to.
+     * <n>
+     * This method is called after restoring each batch and it is advised to 
keep processing to a minimum.
+     * Any heavy processing will hold up recovering the next batch, hence 
slowing down the restore process as a
+     * whole.
+     *
+     * If you need to do any extended processing or connecting to an external 
service consider doing so asynchronously.
+     *
+     * @param topicPartition the TopicPartition containing the values to 
restore
+     * @param storeName the name of the store undergoing restoration
+     * @param batchEndOffset the inclusive ending offset for the current 
restored batch for this TopicPartition

Review Comment:
   ```suggestion
        * @param batchEndOffset the changelog end offset (inclusive) of the 
batch that was just loaded
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+    enum SuspendReason {
+        MIGRATED,
+        PROMOTED
+    }
+
+    /**
+     * Method called upon the creation of the Standby Task.
+     *
+     * @param topicPartition   the TopicPartition of the Standby Task.
+     * @param storeName        the name of the store being watched by this 
Standby Task.
+     * @param startingOffset   the offset from which the Standby Task starts 
watching.
+     */
+    void onUpdateStart(final TopicPartition topicPartition,
+                       final String storeName,
+                       final long startingOffset);
+
+    /**
+     * Method called after restoring a batch of records. In this case the 
maximum size of the batch is whatever
+     * the value of the MAX_POLL_RECORDS is set to.
+     * <n>
+     * This method is called after restoring each batch and it is advised to 
keep processing to a minimum.
+     * Any heavy processing will hold up recovering the next batch, hence 
slowing down the restore process as a
+     * whole.
+     *
+     * If you need to do any extended processing or connecting to an external 
service consider doing so asynchronously.
+     *
+     * @param topicPartition the TopicPartition containing the values to 
restore
+     * @param storeName the name of the store undergoing restoration
+     * @param batchEndOffset the inclusive ending offset for the current 
restored batch for this TopicPartition
+     * @param numRestored the total number of records restored in this batch 
for this TopicPartition
+     * @param currentEndOffset the current end offset of the changelog topic 
partition.
+     */
+    void onBatchLoaded(final TopicPartition topicPartition,
+                       final String storeName,
+                       final TaskId taskId,
+                       final long batchEndOffset,
+                       final long numRestored,
+                       final long currentEndOffset);
+
+    /**
+     * Method called after a Standby Task is closed, either because the 
Standby Task was promoted to an Active Task
+     * or because the Standby Task was migrated to another instance (in which 
case the data will be cleaned up
+     * after state.cleanup.delay.ms).

Review Comment:
   ```suggestion
        * This method is called when the corresponding standby task stops 
updating, for the provided reason.
        * <p>
        * If the task was {@code MIGRATED} to another instance, this callback 
will be invoked after this
        * state store (and the task itself) are closed (in which case the data 
will be cleaned up after 
        * state.cleanup.delay.ms).
        * If the task was {@code PROMOTED} to an active task, the state store 
will not be closed, and the 
        * callback will be invoked after unregistering it as a standby task but 
before re-registering it as an active task 
        * and beginning restoration. In other words, this will always called 
before the corresponding 
        * {@link StateRestoreListener#onRestoreStart} call is made.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+    enum SuspendReason {
+        MIGRATED,
+        PROMOTED
+    }
+
+    /**
+     * Method called upon the creation of the Standby Task.
+     *
+     * @param topicPartition   the TopicPartition of the Standby Task.
+     * @param storeName        the name of the store being watched by this 
Standby Task.
+     * @param startingOffset   the offset from which the Standby Task starts 
watching.
+     */
+    void onUpdateStart(final TopicPartition topicPartition,
+                       final String storeName,
+                       final long startingOffset);
+
+    /**
+     * Method called after restoring a batch of records. In this case the 
maximum size of the batch is whatever
+     * the value of the MAX_POLL_RECORDS is set to.
+     * <n>
+     * This method is called after restoring each batch and it is advised to 
keep processing to a minimum.
+     * Any heavy processing will hold up recovering the next batch, hence 
slowing down the restore process as a
+     * whole.
+     *
+     * If you need to do any extended processing or connecting to an external 
service consider doing so asynchronously.
+     *
+     * @param topicPartition the TopicPartition containing the values to 
restore
+     * @param storeName the name of the store undergoing restoration
+     * @param batchEndOffset the inclusive ending offset for the current 
restored batch for this TopicPartition
+     * @param numRestored the total number of records restored in this batch 
for this TopicPartition

Review Comment:
   ```suggestion
        * @param batchSize the total number of records in the batch that was 
just loaded
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/StandbyUpdateListener.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.TopicPartition;
+
+public interface StandbyUpdateListener {
+
+    enum SuspendReason {
+        MIGRATED,
+        PROMOTED
+    }
+
+    /**
+     * Method called upon the creation of the Standby Task.

Review Comment:
   A callback that will be invoked after registering the changelogs for each 
state store in a standby task. It is guaranteed to always be invoked before any 
records are loaded into the standby store.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to