apoorvmittal10 commented on code in PR #17796:
URL: https://github.com/apache/kafka/pull/17796#discussion_r1868505586
##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -2496,6 +2498,98 @@ public void
testReplicaManagerFetchMultipleSharePartitionsException() {
assertTrue(partitionCacheMap.isEmpty());
}
+ @Test
+ public void testListenerRegistration() {
+ String groupId = "grp";
+ Uuid memberId = Uuid.randomUuid();
+
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("bar", 0));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+ partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+ partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+ ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
+ Partition partition = mockPartition();
+
when(mockReplicaManager.getPartitionOrException(Mockito.any())).thenReturn(partition);
+
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
+ .withReplicaManager(mockReplicaManager)
+ .withTimer(mockTimer)
+ .build();
+
+ sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, partitionMaxBytes);
+ // Validate that the listener is registered.
+ verify(mockReplicaManager, times(2)).maybeAddListener(any(), any());
+ }
+
+ @Test
+ public void testSharePartitionListenerOnFailed() {
+ SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
+ new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo",
0)));
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
+
+ SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey, mockReplicaManager,
partitionCacheMap);
+ testSharePartitionListener(sharePartitionKey, partitionCacheMap,
mockReplicaManager, partitionListener::onFailed);
+ }
+
+ @Test
+ public void testSharePartitionListenerOnDeleted() {
+ SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
+ new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo",
0)));
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
+
+ SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey, mockReplicaManager,
partitionCacheMap);
+ testSharePartitionListener(sharePartitionKey, partitionCacheMap,
mockReplicaManager, partitionListener::onDeleted);
+ }
+
+ @Test
+ public void testSharePartitionListenerOnBecomingFollower() {
+ SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
+ new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo",
0)));
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
+
+ SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey, mockReplicaManager,
partitionCacheMap);
+ testSharePartitionListener(sharePartitionKey, partitionCacheMap,
mockReplicaManager, partitionListener::onBecomingFollower);
+ }
+
+ private void testSharePartitionListener(
+ SharePartitionKey sharePartitionKey,
+ Map<SharePartitionKey, SharePartition> partitionCacheMap,
+ ReplicaManager mockReplicaManager,
+ Consumer<TopicPartition> listenerConsumer
+ ) {
+ // Add another share partition to the cache.
+ TopicPartition tp = new TopicPartition("foo", 1);
+ TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
+ SharePartitionKey spk = new SharePartitionKey("grp", tpId);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ SharePartition sp1 = mock(SharePartition.class);
+ partitionCacheMap.put(sharePartitionKey, sp0);
+ partitionCacheMap.put(spk, sp1);
+
+ // Invoke listener for first share partition.
+
listenerConsumer.accept(sharePartitionKey.topicIdPartition().topicPartition());
+
+ // Validate that the share partition is removed from the cache.
+ assertEquals(1, partitionCacheMap.size());
+ assertFalse(partitionCacheMap.containsKey(sharePartitionKey));
+ verify(sp0, times(1)).markFenced();
+ verify(mockReplicaManager, times(1)).removeListener(any(), any());
+
+ // Invoke listener for second share partition.
Review Comment:
Done.
##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -2496,6 +2498,98 @@ public void
testReplicaManagerFetchMultipleSharePartitionsException() {
assertTrue(partitionCacheMap.isEmpty());
}
+ @Test
+ public void testListenerRegistration() {
+ String groupId = "grp";
+ Uuid memberId = Uuid.randomUuid();
+
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("bar", 0));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+ partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+ partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+ ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
+ Partition partition = mockPartition();
+
when(mockReplicaManager.getPartitionOrException(Mockito.any())).thenReturn(partition);
+
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
+ .withReplicaManager(mockReplicaManager)
+ .withTimer(mockTimer)
+ .build();
+
+ sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, partitionMaxBytes);
+ // Validate that the listener is registered.
+ verify(mockReplicaManager, times(2)).maybeAddListener(any(), any());
+ }
+
+ @Test
+ public void testSharePartitionListenerOnFailed() {
+ SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
+ new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo",
0)));
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
+
+ SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey, mockReplicaManager,
partitionCacheMap);
+ testSharePartitionListener(sharePartitionKey, partitionCacheMap,
mockReplicaManager, partitionListener::onFailed);
+ }
+
+ @Test
+ public void testSharePartitionListenerOnDeleted() {
+ SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
+ new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo",
0)));
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
+
+ SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey, mockReplicaManager,
partitionCacheMap);
+ testSharePartitionListener(sharePartitionKey, partitionCacheMap,
mockReplicaManager, partitionListener::onDeleted);
+ }
+
+ @Test
+ public void testSharePartitionListenerOnBecomingFollower() {
+ SharePartitionKey sharePartitionKey = new SharePartitionKey("grp",
+ new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo",
0)));
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
+
+ SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey, mockReplicaManager,
partitionCacheMap);
+ testSharePartitionListener(sharePartitionKey, partitionCacheMap,
mockReplicaManager, partitionListener::onBecomingFollower);
+ }
+
+ private void testSharePartitionListener(
+ SharePartitionKey sharePartitionKey,
+ Map<SharePartitionKey, SharePartition> partitionCacheMap,
+ ReplicaManager mockReplicaManager,
+ Consumer<TopicPartition> listenerConsumer
+ ) {
+ // Add another share partition to the cache.
+ TopicPartition tp = new TopicPartition("foo", 1);
+ TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
+ SharePartitionKey spk = new SharePartitionKey("grp", tpId);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ SharePartition sp1 = mock(SharePartition.class);
+ partitionCacheMap.put(sharePartitionKey, sp0);
+ partitionCacheMap.put(spk, sp1);
+
+ // Invoke listener for first share partition.
+
listenerConsumer.accept(sharePartitionKey.topicIdPartition().topicPartition());
+
+ // Validate that the share partition is removed from the cache.
+ assertEquals(1, partitionCacheMap.size());
+ assertFalse(partitionCacheMap.containsKey(sharePartitionKey));
+ verify(sp0, times(1)).markFenced();
+ verify(mockReplicaManager, times(1)).removeListener(any(), any());
+
+ // Invoke listener for second share partition.
+ listenerConsumer.accept(tp);
+ // The second share partition should not be removed as the listener is
attached to single topic partition.
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]