junrao commented on code in PR #16842:
URL: https://github.com/apache/kafka/pull/16842#discussion_r1821630956
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -490,6 +497,30 @@ public void acknowledgeSessionUpdate(String groupId,
ShareRequestMetadata reqMet
}
}
+ /**
+ * The handleFetchException method is used to handle the exception that
occurred while reading from log.
+ * The method will handle the exception for each topic-partition in the
request. The share partition
+ * might get removed from the cache.
+ * <p>
+ * The replica read request might error out for one share partition
+ * but as we cannot determine which share partition errored out, we might
remove all the share partitions
+ * in the request.
+ *
+ * @param groupId The group id in the share fetch request.
+ * @param topicIdPartitions The topic-partitions in the replica read
request.
+ * @param future The future to complete with the exception.
+ * @param throwable The exception that occurred while fetching messages.
+ */
+ public void handleFetchException(
+ String groupId,
+ Set<TopicIdPartition> topicIdPartitions,
+ CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
+ Throwable throwable
+ ) {
+ topicIdPartitions.forEach(topicIdPartition ->
handleFencedSharePartitionException(sharePartitionKey(groupId,
topicIdPartition), throwable));
Review Comment:
This is weird. We actually don't know which partition causes throwable.
Ideally, we should just set a top level error instead of applying it on each
partition. We probably shouldn't remove the SharePartition here since we are
not sure which partition to remove.
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -617,22 +667,44 @@ private void maybeCompleteInitializationWithException(
return;
}
- if (throwable instanceof NotLeaderOrFollowerException || throwable
instanceof FencedStateEpochException) {
+ // Remove the partition from the cache as it's failed to initialize.
+ partitionCacheMap.remove(sharePartitionKey);
+ // The partition initialization failed, so complete the request with
the exception.
+ // The server should not be in this state, so log the error on broker
and surface the same
+ // to the client. The broker should not be in this state, investigate
the root cause of the error.
+ log.error("Error initializing share partition with key {}",
sharePartitionKey, throwable);
+ maybeCompleteShareFetchWithException(future,
Collections.singletonList(sharePartitionKey.topicIdPartition()), throwable);
+ }
+
+ private void handleFencedSharePartitionException(
+ SharePartitionKey sharePartitionKey,
+ Throwable throwable
+ ) {
+ if (throwable instanceof NotLeaderOrFollowerException || throwable
instanceof FencedStateEpochException ||
+ throwable instanceof GroupIdNotFoundException || throwable
instanceof UnknownTopicOrPartitionException) {
log.info("The share partition with key {} is fenced: {}",
sharePartitionKey, throwable.getMessage());
// The share partition is fenced hence remove the partition from
map and let the client retry.
// But surface the error to the client so client might take some
action i.e. re-fetch
// the metadata and retry the fetch on new leader.
- partitionCacheMap.remove(sharePartitionKey);
- future.completeExceptionally(throwable);
- return;
+ SharePartition sharePartition =
partitionCacheMap.remove(sharePartitionKey);
+ if (sharePartition != null) {
+ sharePartition.markFenced();
+ }
}
+ }
- // The partition initialization failed, so complete the request with
the exception.
- // The server should not be in this state, so log the error on broker
and surface the same
- // to the client. As of now this state is in-recoverable for the
broker, and we should
- // investigate the root cause of the error.
- log.error("Error initializing share partition with key {}",
sharePartitionKey, throwable);
- future.completeExceptionally(throwable);
+ private void
maybeCompleteShareFetchWithException(CompletableFuture<Map<TopicIdPartition,
PartitionData>> future,
+ Collection<TopicIdPartition> topicIdPartitions, Throwable throwable) {
+ if (!future.isDone()) {
+
future.complete(topicIdPartitions.stream().collect(Collectors.toMap(
+ tp -> tp, tp -> new
PartitionData().setErrorCode(Errors.forException(throwable).code()).setErrorMessage(throwable.getMessage()))));
+ }
+ }
+
+ private void
completeShareFetchWithException(CompletableFuture<Map<TopicIdPartition,
PartitionData>> future,
+ Map<TopicIdPartition, Throwable> erroneous) {
+ future.complete(erroneous.entrySet().stream().collect(Collectors.toMap(
+ Map.Entry::getKey, entry -> new
PartitionData().setErrorCode(Errors.forException(entry.getValue()).code()).setErrorMessage(entry.getValue().getMessage()))));
Review Comment:
The line is getting too long. Could we avoid calling `entry.getValue()`
twice?
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -540,57 +571,76 @@ void processShareFetch(ShareFetchData shareFetchData) {
return;
}
- try {
-
shareFetchData.partitionMaxBytes().keySet().forEach(topicIdPartition -> {
- SharePartitionKey sharePartitionKey = sharePartitionKey(
- shareFetchData.groupId(),
- topicIdPartition
- );
- SharePartition sharePartition =
getOrCreateSharePartition(sharePartitionKey);
+ // Initialize lazily, if required.
+ Map<TopicIdPartition, Throwable> erroneous = null;
+ Set<DelayedShareFetchKey> delayedShareFetchWatchKeys = new HashSet<>();
+ for (TopicIdPartition topicIdPartition :
shareFetchData.partitionMaxBytes().keySet()) {
+ SharePartitionKey sharePartitionKey = sharePartitionKey(
+ shareFetchData.groupId(),
+ topicIdPartition
+ );
+
+ SharePartition sharePartition;
+ try {
+ sharePartition = getOrCreateSharePartition(sharePartitionKey);
+ } catch (Exception e) {
+ // Complete the whole fetch request with an exception if there
is an error processing.
+ // The exception currently can be thrown only if there is an
error while initializing
+ // the share partition. But skip the processing for other
share partitions in the request
+ // as this situation is not expected.
+ log.error("Error processing share fetch request", e);
+ if (erroneous == null) {
+ erroneous = new HashMap<>();
Review Comment:
It seems it's more intuitive to initialize erroneous as an empty map so that
we don't need to deal with it being `null`.
##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -2157,19 +2153,203 @@ public void testShareFetchProcessingExceptions()
throws Exception {
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing for delayed share fetch request not finished.");
- assertTrue(future.isCompletedExceptionally());
- assertFutureThrows(future, RuntimeException.class, "Error creating
instance");
+ validateShareFetchFutureException(future, tp0,
Errors.UNKNOWN_SERVER_ERROR, "Error creating instance");
+ }
+
+ @Test
+ public void testSharePartitionInitializationFailure() throws Exception {
+ String groupId = "grp";
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ Map<TopicIdPartition, Integer> partitionMaxBytes =
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+
+ // Send map to check no share partition is created.
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ // Validate when partition is not the leader.
+ Partition partition = mock(Partition.class);
+ when(partition.isLeader()).thenReturn(false);
+
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ // First check should throw KafkaStorageException, second check should
return partition which
+ // is not leader.
+ when(replicaManager.getPartitionOrException(any()))
+ .thenThrow(new KafkaStorageException("Exception"))
+ .thenReturn(partition);
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
+ .withReplicaManager(replicaManager)
+ .withPartitionCacheMap(partitionCacheMap)
+ .build();
- // Throw exception from share partition for second fetch request.
- when(sp0.maybeInitialize()).thenThrow(new RuntimeException("Error
initializing instance"));
+ // Validate when exception is thrown.
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
+ sharePartitionManager.fetchMessages(groupId,
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
+ TestUtils.waitForCondition(
+ future::isDone,
+ DELAYED_SHARE_FETCH_TIMEOUT_MS,
+ () -> "Processing for delayed share fetch request not finished.");
+ validateShareFetchFutureException(future, tp0,
Errors.KAFKA_STORAGE_ERROR, "Exception");
+ assertTrue(partitionCacheMap.isEmpty());
+ // Validate when partition is not leader.
future = sharePartitionManager.fetchMessages(groupId,
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing for delayed share fetch request not finished.");
- assertTrue(future.isCompletedExceptionally());
- assertFutureThrows(future, RuntimeException.class, "Error initializing
instance");
+ validateShareFetchFutureException(future, tp0,
Errors.NOT_LEADER_OR_FOLLOWER);
+ assertTrue(partitionCacheMap.isEmpty());
+ }
+
+ @Test
+ public void testSharePartitionPartialInitializationFailure() throws
Exception {
+ String groupId = "grp";
+ Uuid memberId1 = Uuid.randomUuid();
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(memberId1, new
TopicPartition("foo", 1));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = Map.of(tp0,
PARTITION_MAX_BYTES, tp1, PARTITION_MAX_BYTES);
+
+ // Mark partition1 as not the leader.
+ Partition partition1 = mock(Partition.class);
+ when(partition1.isLeader()).thenReturn(false);
+
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ when(replicaManager.getPartitionOrException(any()))
+ .thenReturn(partition1);
+
+ SharePartition sp1 = mock(SharePartition.class);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+
+ when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+ when(sp1.canAcquireRecords()).thenReturn(true);
+
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+ when(sp1.acquire(anyString(), anyInt(), any())).thenReturn(new
ShareAcquiredRecords(Collections.emptyList(), 0));
+
+ DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+ "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ mockReplicaManagerDelayedShareFetch(replicaManager,
delayedShareFetchPurgatory);
+
+ doAnswer(invocation ->
buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
+
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
+ .withReplicaManager(replicaManager)
+ .withPartitionCacheMap(partitionCacheMap)
+ .build();
+
+ // Validate when exception is thrown.
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
+ sharePartitionManager.fetchMessages(groupId,
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
+ assertTrue(future.isDone());
+ assertFalse(future.isCompletedExceptionally());
+
+ Map<TopicIdPartition, PartitionData> partitionDataMap = future.get();
+ // For now only 1 successful partition is included, this will be fixed
in subsequents PRs.
+ assertEquals(1, partitionDataMap.size());
+ assertTrue(partitionDataMap.containsKey(tp1));
+ assertEquals(Errors.NONE.code(),
partitionDataMap.get(tp1).errorCode());
+
+ Mockito.verify(replicaManager, times(1)).readFromLog(
+ any(), any(), any(ReplicaQuota.class), anyBoolean());
+ }
+
+ @Test
+ public void testReplicaManagerFetchException() {
+ String groupId = "grp";
+ Uuid memberId = Uuid.randomUuid();
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ Map<TopicIdPartition, Integer> partitionMaxBytes =
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+ when(sp0.canAcquireRecords()).thenReturn(true);
+
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+
+ DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+ "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
+
+ doThrow(new
RuntimeException("Exception")).when(mockReplicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
+
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
+ .withPartitionCacheMap(partitionCacheMap)
+ .withReplicaManager(mockReplicaManager)
+ .withTimer(mockTimer)
+ .build();
+
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
+ sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, partitionMaxBytes);
+ validateShareFetchFutureException(future, tp0,
Errors.UNKNOWN_SERVER_ERROR, "Exception");
+ // Verify that the share partition is still in the cache on exception.
+ assertEquals(1, partitionCacheMap.size());
+
+ // Throw NotLeaderOrFollowerException from replica manager fetch which
should evict instance from the cache.
+ doThrow(new NotLeaderOrFollowerException("Leader
exception")).when(mockReplicaManager).readFromLog(any(), any(),
any(ReplicaQuota.class), anyBoolean());
+
+ future = sharePartitionManager.fetchMessages(groupId,
memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
+ validateShareFetchFutureException(future, tp0,
Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception");
+ assertTrue(partitionCacheMap.isEmpty());
+ }
+
+ @Test
+ public void testReplicaManagerFetchMultipleSharePartitionsException() {
+ String groupId = "grp";
+ Uuid memberId = Uuid.randomUuid();
+
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("bar", 0));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+ partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+ partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+ when(sp0.canAcquireRecords()).thenReturn(true);
+
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+
+ SharePartition sp1 = mock(SharePartition.class);
+ // Do not make the share partition acquirable hence it shouldn't be
removed from the cache,
+ // as it won't be part of replica manger readFromLog request.
+
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+
+ DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+ "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
+
+ // Throw FencedStateEpochException from replica manager fetch which
should evict instance from the cache.
+ doThrow(new FencedStateEpochException("Fenced
exception")).when(mockReplicaManager).readFromLog(any(), any(),
any(ReplicaQuota.class), anyBoolean());
+
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
+ .withPartitionCacheMap(partitionCacheMap)
+ .withReplicaManager(mockReplicaManager)
+ .withTimer(mockTimer)
+ .build();
+
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
+ sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, partitionMaxBytes);
+ validateShareFetchFutureException(future, tp0,
Errors.FENCED_STATE_EPOCH, "Fenced exception");
Review Comment:
Hmm, why does the completed future have only 1 partition?
##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -2157,19 +2153,203 @@ public void testShareFetchProcessingExceptions()
throws Exception {
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing for delayed share fetch request not finished.");
- assertTrue(future.isCompletedExceptionally());
- assertFutureThrows(future, RuntimeException.class, "Error creating
instance");
+ validateShareFetchFutureException(future, tp0,
Errors.UNKNOWN_SERVER_ERROR, "Error creating instance");
+ }
+
+ @Test
+ public void testSharePartitionInitializationFailure() throws Exception {
+ String groupId = "grp";
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ Map<TopicIdPartition, Integer> partitionMaxBytes =
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+
+ // Send map to check no share partition is created.
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ // Validate when partition is not the leader.
+ Partition partition = mock(Partition.class);
+ when(partition.isLeader()).thenReturn(false);
+
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ // First check should throw KafkaStorageException, second check should
return partition which
+ // is not leader.
+ when(replicaManager.getPartitionOrException(any()))
+ .thenThrow(new KafkaStorageException("Exception"))
+ .thenReturn(partition);
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
+ .withReplicaManager(replicaManager)
+ .withPartitionCacheMap(partitionCacheMap)
+ .build();
- // Throw exception from share partition for second fetch request.
- when(sp0.maybeInitialize()).thenThrow(new RuntimeException("Error
initializing instance"));
+ // Validate when exception is thrown.
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
+ sharePartitionManager.fetchMessages(groupId,
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
+ TestUtils.waitForCondition(
+ future::isDone,
+ DELAYED_SHARE_FETCH_TIMEOUT_MS,
+ () -> "Processing for delayed share fetch request not finished.");
+ validateShareFetchFutureException(future, tp0,
Errors.KAFKA_STORAGE_ERROR, "Exception");
+ assertTrue(partitionCacheMap.isEmpty());
+ // Validate when partition is not leader.
future = sharePartitionManager.fetchMessages(groupId,
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
TestUtils.waitForCondition(
future::isDone,
DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing for delayed share fetch request not finished.");
- assertTrue(future.isCompletedExceptionally());
- assertFutureThrows(future, RuntimeException.class, "Error initializing
instance");
+ validateShareFetchFutureException(future, tp0,
Errors.NOT_LEADER_OR_FOLLOWER);
+ assertTrue(partitionCacheMap.isEmpty());
+ }
+
+ @Test
+ public void testSharePartitionPartialInitializationFailure() throws
Exception {
+ String groupId = "grp";
+ Uuid memberId1 = Uuid.randomUuid();
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(memberId1, new
TopicPartition("foo", 1));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = Map.of(tp0,
PARTITION_MAX_BYTES, tp1, PARTITION_MAX_BYTES);
+
+ // Mark partition1 as not the leader.
+ Partition partition1 = mock(Partition.class);
+ when(partition1.isLeader()).thenReturn(false);
+
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ when(replicaManager.getPartitionOrException(any()))
+ .thenReturn(partition1);
+
+ SharePartition sp1 = mock(SharePartition.class);
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+
+ when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+ when(sp1.canAcquireRecords()).thenReturn(true);
+
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+ when(sp1.acquire(anyString(), anyInt(), any())).thenReturn(new
ShareAcquiredRecords(Collections.emptyList(), 0));
+
+ DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+ "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ mockReplicaManagerDelayedShareFetch(replicaManager,
delayedShareFetchPurgatory);
+
+ doAnswer(invocation ->
buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
+
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
+ .withReplicaManager(replicaManager)
+ .withPartitionCacheMap(partitionCacheMap)
+ .build();
+
+ // Validate when exception is thrown.
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
+ sharePartitionManager.fetchMessages(groupId,
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
+ assertTrue(future.isDone());
+ assertFalse(future.isCompletedExceptionally());
+
+ Map<TopicIdPartition, PartitionData> partitionDataMap = future.get();
+ // For now only 1 successful partition is included, this will be fixed
in subsequents PRs.
+ assertEquals(1, partitionDataMap.size());
+ assertTrue(partitionDataMap.containsKey(tp1));
+ assertEquals(Errors.NONE.code(),
partitionDataMap.get(tp1).errorCode());
+
+ Mockito.verify(replicaManager, times(1)).readFromLog(
+ any(), any(), any(ReplicaQuota.class), anyBoolean());
+ }
+
+ @Test
+ public void testReplicaManagerFetchException() {
+ String groupId = "grp";
+ Uuid memberId = Uuid.randomUuid();
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ Map<TopicIdPartition, Integer> partitionMaxBytes =
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+ when(sp0.canAcquireRecords()).thenReturn(true);
+
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+ Map<SharePartitionKey, SharePartition> partitionCacheMap = new
HashMap<>();
+ partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+
+ DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+ "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+ mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
+
+ doThrow(new
RuntimeException("Exception")).when(mockReplicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
+
+ SharePartitionManager sharePartitionManager =
SharePartitionManagerBuilder.builder()
+ .withPartitionCacheMap(partitionCacheMap)
+ .withReplicaManager(mockReplicaManager)
+ .withTimer(mockTimer)
+ .build();
+
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future =
+ sharePartitionManager.fetchMessages(groupId, memberId.toString(),
FETCH_PARAMS, partitionMaxBytes);
+ validateShareFetchFutureException(future, tp0,
Errors.UNKNOWN_SERVER_ERROR, "Exception");
+ // Verify that the share partition is still in the cache on exception.
+ assertEquals(1, partitionCacheMap.size());
+
+ // Throw NotLeaderOrFollowerException from replica manager fetch which
should evict instance from the cache.
+ doThrow(new NotLeaderOrFollowerException("Leader
exception")).when(mockReplicaManager).readFromLog(any(), any(),
any(ReplicaQuota.class), anyBoolean());
+
+ future = sharePartitionManager.fetchMessages(groupId,
memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
+ validateShareFetchFutureException(future, tp0,
Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception");
+ assertTrue(partitionCacheMap.isEmpty());
+ }
+
+ @Test
+ public void testReplicaManagerFetchMultipleSharePartitionsException() {
+ String groupId = "grp";
+ Uuid memberId = Uuid.randomUuid();
+
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("bar", 0));
+ Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+ partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+ partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+ when(sp0.canAcquireRecords()).thenReturn(true);
+
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+
+ SharePartition sp1 = mock(SharePartition.class);
+ // Do not make the share partition acquirable hence it shouldn't be
removed from the cache,
Review Comment:
Hmm, should we explicitly mock `sp1.maybeAcquireFetchLock` to false?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1053,6 +1085,22 @@ void releaseFetchLock() {
fetchLock.set(false);
}
+ /**
+ * Marks the share partition as fenced.
+ */
+ void markFenced() {
+ lock.writeLock().lock();
+ try {
+ partitionState = SharePartitionState.FENCED;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private boolean stateNotActive() {
+ return partitionState() != SharePartitionState.ACTIVE;
Review Comment:
We probably should throw a fenced exception and let the caller handle it.
This can be done in a separate PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]