apoorvmittal10 commented on code in PR #16842:
URL: https://github.com/apache/kafka/pull/16842#discussion_r1823155007
##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -2157,19 +2153,203 @@ public void testShareFetchProcessingExceptions()
throws Exception {
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing for delayed share fetch request not finished.");
- assertTrue(future.isCompletedExceptionally());
- assertFutureThrows(future, RuntimeException.class, "Error creating
instance");
+ validateShareFetchFutureException(future, tp0,
Errors.UNKNOWN_SERVER_ERROR, "Error creating instance");
+ }
+
+ @Test
+ public void testSharePartitionInitializationFailure() throws Exception {
+ String groupId = "grp";
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ Map<TopicIdPartition, Integer> partitionMaxBytes =
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+
+ // Send map to check no share partition is created.
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ // Validate when partition is not the leader.
+ Partition partition = mock(Partition.class);
+ when(partition.isLeader()).thenReturn(false);
+
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ // First check should throw KafkaStorageException, second check should
return partition which
+ // is not leader.
+ when(replicaManager.getPartitionOrException(any()))
+ .thenThrow(new KafkaStorageException("Exception"))
+ .thenReturn(partition);
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
+ .withReplicaManager(replicaManager)
+ .withPartitionCacheMap(partitionCacheMap)
+ .build();
- // Throw exception from share partition for second fetch request.
- when(sp0.maybeInitialize()).thenThrow(new RuntimeException("Error
initializing instance"));
+ // Validate when exception is thrown.
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
+ sharePartitionManager.fetchMessages(groupId,
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
+ TestUtils.waitForCondition(
+ future::isDone,
+ DELAYED_SHARE_FETCH_TIMEOUT_MS,
+ () -> "Processing for delayed share fetch request not finished.");
+ validateShareFetchFutureException(future, tp0,
Errors.KAFKA_STORAGE_ERROR, "Exception");
+ assertTrue(partitionCacheMap.isEmpty());
+ // Validate when partition is not leader.
future = sharePartitionManager.fetchMessages(groupId,
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing for delayed share fetch request not finished.");
- assertTrue(future.isCompletedExceptionally());
- assertFutureThrows(future, RuntimeException.class, "Error initializing
instance");
+ validateShareFetchFutureException(future, tp0,
Errors.NOT_LEADER_OR_FOLLOWER);
+ assertTrue(partitionCacheMap.isEmpty());
+ }
+
+ @Test
+ public void testSharePartitionPartialInitializationFailure() throws
Exception {
+ String groupId = "grp";
+ Uuid memberId1 = Uuid.randomUuid();
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(memberId1, new
TopicPartition("foo", 1));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = Map.of(tp0,
PARTITION_MAX_BYTES, tp1, PARTITION_MAX_BYTES);
+
+ // Mark partition1 as not the leader.
+ Partition partition1 = mock(Partition.class);
+ when(partition1.isLeader()).thenReturn(false);
+
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ when(replicaManager.getPartitionOrException(any()))
+ .thenReturn(partition1);
+
+ SharePartition sp1 = mock(SharePartition.class);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+
+ when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+ when(sp1.canAcquireRecords()).thenReturn(true);
+
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+ when(sp1.acquire(anyString(), anyInt(), any())).thenReturn(new
ShareAcquiredRecords(Collections.emptyList(), 0));
+
+ DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+ "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ mockReplicaManagerDelayedShareFetch(replicaManager,
delayedShareFetchPurgatory);
+
+ doAnswer(invocation ->
buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
+
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
+ .withReplicaManager(replicaManager)
+ .withPartitionCacheMap(partitionCacheMap)
+ .build();
+
+ // Validate when exception is thrown.
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
+ sharePartitionManager.fetchMessages(groupId,
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
+ assertTrue(future.isDone());
+ assertFalse(future.isCompletedExceptionally());
+
+ Map<TopicIdPartition, PartitionData> partitionDataMap = future.get();
+ // For now only 1 successful partition is included, this will be fixed
in subsequents PRs.
+ assertEquals(1, partitionDataMap.size());
+ assertTrue(partitionDataMap.containsKey(tp1));
+ assertEquals(Errors.NONE.code(),
partitionDataMap.get(tp1).errorCode());
+
+ Mockito.verify(replicaManager, times(1)).readFromLog(
+ any(), any(), any(ReplicaQuota.class), anyBoolean());
+ }
+
+ @Test
+ public void testReplicaManagerFetchException() {
+ String groupId = "grp";
+ Uuid memberId = Uuid.randomUuid();
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ Map<TopicIdPartition, Integer> partitionMaxBytes =
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+ when(sp0.canAcquireRecords()).thenReturn(true);
+
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+
+ DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+ "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
+
+ doThrow(new
RuntimeException("Exception")).when(mockReplicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
+
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
+ .withPartitionCacheMap(partitionCacheMap)
+ .withReplicaManager(mockReplicaManager)
+ .withTimer(mockTimer)
+ .build();
+
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
+ sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, partitionMaxBytes);
+ validateShareFetchFutureException(future, tp0,
Errors.UNKNOWN_SERVER_ERROR, "Exception");
+ // Verify that the share partition is still in the cache on exception.
+ assertEquals(1, partitionCacheMap.size());
+
+ // Throw NotLeaderOrFollowerException from replica manager fetch which
should evict instance from the cache.
+ doThrow(new NotLeaderOrFollowerException("Leader
exception")).when(mockReplicaManager).readFromLog(any(), any(),
any(ReplicaQuota.class), anyBoolean());
+
+ future = sharePartitionManager.fetchMessages(groupId,
memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
+ validateShareFetchFutureException(future, tp0,
Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception");
+ assertTrue(partitionCacheMap.isEmpty());
+ }
+
+ @Test
+ public void testReplicaManagerFetchMultipleSharePartitionsException() {
+ String groupId = "grp";
+ Uuid memberId = Uuid.randomUuid();
+
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("bar", 0));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+ partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+ partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+ when(sp0.canAcquireRecords()).thenReturn(true);
+
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+
+ SharePartition sp1 = mock(SharePartition.class);
+ // Do not make the share partition acquirable hence it shouldn't be
removed from the cache,
Review Comment:
Thanks, done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]