chia7712 commented on code in PR #20083:
URL: https://github.com/apache/kafka/pull/20083#discussion_r2366248494


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionState.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.metadata.LeaderRecoveryState;
+
+import java.util.Set;
+
+/**
+ * Represents the state of a partition, including its In-Sync Replicas (ISR) 
and leader recovery state.
+ */
+public interface PartitionState {
+    /**
+     * Includes only the in-sync replicas which have been committed to 
Controller.
+     */
+    Set<Integer> isr();
+
+    /**
+     * This set may include un-committed ISR members following an expansion. 
This "effective" ISR is used for advancing

Review Comment:
   `un-committed` -> `uncommitted`



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionListener.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Listener receives notification from an Online Partition.
+ *

Review Comment:
   ` * <p>`



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/PendingPartitionChange.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.metadata.LeaderAndIsr;
+
+/**
+ * Represents a partition state that is currently undergoing a change, such as 
an ISR expansion or shrinking.
+ */
+public interface PendingPartitionChange extends PartitionState {
+    /**
+     * Returns the last committed partition state before this pending change.
+     */
+    CommittedPartitionState lastCommittedState();
+    /**
+     * Returns the LeaderAndIsr object that was sent to the controller for 
this pending change.

Review Comment:
   `Returns the LeaderAndIsr object sent to the controller for this pending 
change.`



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/OngoingReassignmentState.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.List;
+
+public record OngoingReassignmentState(
+        List<Integer> addingReplicas,
+        List<Integer> removingReplicas,
+        List<Integer> replicas
+) implements AssignmentState {
+
+    public OngoingReassignmentState {
+        addingReplicas = List.copyOf(addingReplicas);
+        removingReplicas = List.copyOf(removingReplicas);
+        replicas = List.copyOf(replicas);
+    }
+
+    @Override
+    public int replicationFactor() {
+        return (int) replicas.stream().filter(r -> 
!addingReplicas.contains(r)).count();

Review Comment:
   ```
   Note: replicas may also include those currently being added.
   ```



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/PendingPartitionChange.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.metadata.LeaderAndIsr;
+
+/**
+ * Represents a partition state that is currently undergoing a change, such as 
an ISR expansion or shrinking.

Review Comment:
   `Represents a partition state currently undergoing a change, such as an ISR 
expansion or shrinking.`



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionListener.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition 
fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No 
further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be held during their execution. They are meant to be used
+ * as notification mechanism only.
+ */
+public interface PartitionListener {
+    /**
+     * Called when the Log increments its high watermark.
+     *
+     * @param partition The topic partition for which the high watermark was 
updated.
+     * @param offset    The new high watermark offset.
+     */
+    default void onHighWatermarkUpdated(TopicPartition partition, long offset) 
{}
+
+    /**
+     * Called when the Partition (or replica) on this broker has a failure 
(e.g. goes offline).
+     *
+     * @param partition The topic partition that failed.
+     */
+    default void onFailed(TopicPartition partition) {}
+
+    /**
+     * Called when the Partition (or replica) on this broker is deleted. Note 
that it does not mean
+     * that the partition was deleted but only that this broker does not host 
a replica of it any more.
+     *
+     * @param partition The topic partition that was deleted from this broker.
+     */
+    default void onDeleted(TopicPartition partition) {}
+
+    /**
+     * Called when the Partition on this broker is transitioned to follower.
+     *
+     * @param partition The topic partition that transitioned to follower role.

Review Comment:
   `a follower role`



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionListener.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition 
fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No 
further
+ * notifications are sent after this point on.
+ *

Review Comment:
   ` * <p>`



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionListener.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition 
fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No 
further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be held during their execution. They are meant to be used
+ * as notification mechanism only.

Review Comment:
   `as a notification mechanism only.`



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionListener.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition 
fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No 
further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be held during their execution. They are meant to be used
+ * as notification mechanism only.
+ */
+public interface PartitionListener {
+    /**
+     * Called when the Log increments its high watermark.
+     *
+     * @param partition The topic partition for which the high watermark was 
updated.
+     * @param offset    The new high watermark offset.
+     */
+    default void onHighWatermarkUpdated(TopicPartition partition, long offset) 
{}
+
+    /**
+     * Called when the Partition (or replica) on this broker has a failure 
(e.g. goes offline).

Review Comment:
   `e.g.,`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to