clolov commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1097383481


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -49,6 +49,25 @@ import org.apache.kafka.server.log.internals.{AppendOrigin, 
FetchDataInfo, Fetch
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition 
fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No 
further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be hold during their execution. They are meant to be used

Review Comment:
   ```suggestion
    * AND that locks may be held during their execution. They are meant to be 
used
   ```



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -2799,6 +2822,206 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(replicas, partition.assignmentState.replicas)
   }
 
+  @Test
+  def testAddAndRemoveListeners(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    val listener1 = new MockPartitionListener()
+    val listener2 = new MockPartitionListener()
+
+    assertTrue(partition.maybeAddListener(listener1))
+    listener1.verify(expectedHighWatermark = 0L)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k1".getBytes, 
"v1".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    listener1.verify()
+    listener2.verify()
+
+    assertTrue(partition.maybeAddListener(listener2))
+    listener2.verify(expectedHighWatermark = 0L)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k2".getBytes, 
"v2".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    fetchFollower(
+      partition = partition,
+      replicaId = brokerId + 1,
+      fetchOffset = partition.localLogOrException.logEndOffset
+    )
+
+    listener1.verify(expectedHighWatermark = 
partition.localLogOrException.logEndOffset)
+    listener2.verify(expectedHighWatermark = 
partition.localLogOrException.logEndOffset)
+
+    partition.removeListener(listener1)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k3".getBytes, 
"v3".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    fetchFollower(
+      partition = partition,
+      replicaId = brokerId + 1,
+      fetchOffset = partition.localLogOrException.logEndOffset
+    )
+
+    listener1.verify()
+    listener2.verify(expectedHighWatermark = 
partition.localLogOrException.logEndOffset)
+  }
+
+  @Test
+  def testAddListenerFailsWhenPartitionIsDeleted(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    partition.delete()
+
+    assertFalse(partition.maybeAddListener(new MockPartitionListener()))
+  }
+
+  @Test
+  def testPartitionListenerWhenLogOffsetsChanged(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    val listener = new MockPartitionListener()
+    partition.maybeAddListener(listener)

Review Comment:
   Nit:
   ```suggestion
       assertTrue(partition.maybeAddListener(listener))
   ```



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -334,6 +334,29 @@ class ReplicaManager(val config: KafkaConfig,
     topicPartitions.foreach(tp => 
delayedFetchPurgatory.checkAndComplete(TopicPartitionOperationKey(tp)))
   }
 
+  /**
+   * Registers the provided listener to the partition iff the partition is 
online.
+   */
+  def maybeAddListener(partition: TopicPartition, listener: 
PartitionListener): Boolean = {
+    getPartition(partition) match {
+      case HostedPartition.Online(partition) =>
+        partition.maybeAddListener(listener)
+      case _ =>
+        false
+    }
+  }
+
+  /**
+   * Removes the provided listener from the partition.
+   */
+  def removeListener(partition: TopicPartition, listener: PartitionListener): 
Unit = {
+    getPartition(partition) match {
+      case HostedPartition.Online(partition) =>
+        partition.removeListener(listener)
+      case _ => // Ignore

Review Comment:
   Could you please elaborate why we cannot remove a listener from an offline 
partition as well? Most probably I lack context, but this seems like a 
reasonable operation to me.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -73,6 +73,22 @@ object LogAppendInfo {
       offsetsMonotonic = false, -1L, recordErrors, errorMessage)
 }
 
+/**
+ * Listener receive notification from the Log.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be hold during their execution. They are meant to be used

Review Comment:
   ```suggestion
    * AND that locks may be held during their execution. They are meant to be 
used
   ```



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -2799,6 +2822,206 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(replicas, partition.assignmentState.replicas)
   }
 
+  @Test
+  def testAddAndRemoveListeners(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    val listener1 = new MockPartitionListener()
+    val listener2 = new MockPartitionListener()
+
+    assertTrue(partition.maybeAddListener(listener1))
+    listener1.verify(expectedHighWatermark = 0L)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k1".getBytes, 
"v1".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    listener1.verify()
+    listener2.verify()
+
+    assertTrue(partition.maybeAddListener(listener2))
+    listener2.verify(expectedHighWatermark = 0L)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k2".getBytes, 
"v2".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    fetchFollower(
+      partition = partition,
+      replicaId = brokerId + 1,
+      fetchOffset = partition.localLogOrException.logEndOffset
+    )
+
+    listener1.verify(expectedHighWatermark = 
partition.localLogOrException.logEndOffset)
+    listener2.verify(expectedHighWatermark = 
partition.localLogOrException.logEndOffset)
+
+    partition.removeListener(listener1)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k3".getBytes, 
"v3".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    fetchFollower(
+      partition = partition,
+      replicaId = brokerId + 1,
+      fetchOffset = partition.localLogOrException.logEndOffset
+    )
+
+    listener1.verify()
+    listener2.verify(expectedHighWatermark = 
partition.localLogOrException.logEndOffset)
+  }
+
+  @Test
+  def testAddListenerFailsWhenPartitionIsDeleted(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    partition.delete()
+
+    assertFalse(partition.maybeAddListener(new MockPartitionListener()))
+  }
+
+  @Test
+  def testPartitionListenerWhenLogOffsetsChanged(): Unit = {
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, topicId = None)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    val listener = new MockPartitionListener()
+    partition.maybeAddListener(listener)
+    listener.verify(expectedHighWatermark = 0L)
+
+    partition.appendRecordsToLeader(
+      records = TestUtils.records(List(new SimpleRecord("k1".getBytes, 
"v1".getBytes))),
+      origin = AppendOrigin.CLIENT,
+      requiredAcks = 0,
+      requestLocal = RequestLocal.NoCaching
+    )
+
+    listener.verify()
+
+    fetchFollower(
+      partition = partition,
+      replicaId = brokerId + 1,
+      fetchOffset = partition.localLogOrException.logEndOffset
+    )
+
+    listener.verify(expectedHighWatermark = 
partition.localLogOrException.logEndOffset)
+
+    partition.truncateFullyAndStartAt(0L, false)
+
+    listener.verify(expectedHighWatermark = 0L)
+  }
+
+  @Test
+  def testPartitionListenerWhenCurrentIsReplacedWithFutureLog(): Unit = {
+    logManager.maybeUpdatePreferredLogDir(topicPartition, 
logDir1.getAbsolutePath)
+    partition.createLogIfNotExists(isNew = true, isFutureReplica = false, 
offsetCheckpoints, topicId = None)
+    assertTrue(partition.log.isDefined)
+
+    partition.makeLeader(
+      new LeaderAndIsrPartitionState()
+        .setControllerEpoch(0)
+        .setLeader(brokerId)
+        .setLeaderEpoch(0)
+        .setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+        .setPartitionEpoch(1)
+        .setIsNew(true),
+      offsetCheckpoints,
+      topicId = None)
+
+    val listener = new MockPartitionListener()
+    partition.maybeAddListener(listener)

Review Comment:
   My only reasoning for the nit is that the test can fail fast if someone 
changes the implementation logic:
   ```suggestion
       assertTrue(partition.maybeAddListener(listener))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to