kirktrue commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1423134165
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -790,6 +791,297 @@ public void
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
verify(membershipManager, never()).transitionToJoining();
}
+ @Test
+ public void testListenerCallbacksBasic() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener();
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+ String topicName = "topic1";
+ Uuid topicId = Uuid.randomUuid();
+
+
when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
+ when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
+
+ // Step 2: put the state machine into the appropriate... state
+
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
+ receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: assign partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ topicPartitions(topicName, 0, 1)
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ // Step 4: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.STABLE, membershipManager.state());
+ assertEquals(topicIdPartitions(topicId, topicName, 0, 1),
membershipManager.currentAssignment());
+
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(1, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ // Step 5: receive an empty assignment, which means we should call
revoke
+
when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName,
0, 1));
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 6: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions(topicName, 0, 1)
+ );
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 7: assign partitions should still be called, even though it's
empty
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ Collections.emptySortedSet()
+ );
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ // Step 8: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.STABLE, membershipManager.state());
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ assertEquals(1, listener.revokedCounter.get());
+ assertEquals(2, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+ }
+
+ // TODO: Reconciliation needs to support when a listener throws an error
on onPartitionsRevoked(). When that
+ // happens, the assignment step is skipped, which means
onPartitionsAssigned() is never run.
+ // The jury is out on whether or not this is a bug or intentional.
+ //
+ // See
https://github.com/apache/kafka/pull/14640#discussion_r1421253120 for more
details.
+ // @Test
+ public void testListenerCallbacksThrowsErrorOnPartitionsRevoked() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ mockOwnedPartition("topic1", 0);
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener(
+ Optional.of(new IllegalArgumentException("Intentional
onPartitionsRevoked() error")),
+ Optional.empty(),
+ Optional.empty()
+ );
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions("topic1", 0)
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ // Step 4: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ assertEquals(1, listener.revokedCounter.get());
+ assertEquals(1, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+ }
+
+ @Test
+ public void testListenerCallbacksThrowsErrorOnPartitionsAssigned() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ mockOwnedPartition("topic1", 0);
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener(
+ Optional.empty(),
+ Optional.of(new IllegalArgumentException("Intentional
onPartitionsAssigned() error")),
+ Optional.empty()
+ );
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions("topic1", 0)
+ );
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 4: assign partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ Collections.emptySortedSet()
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ // Step 5: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ assertEquals(1, listener.revokedCounter.get());
+ assertEquals(1, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+ }
+
+ @Test
+ public void testOnPartitionsLostNoError() {
+ mockOwnedPartition("topic1", 0);
+ testOnPartitionsLost(Optional.empty());
+ }
+
+ @Test
+ public void testOnPartitionsLostError() {
+ mockOwnedPartition("topic1", 0);
+ testOnPartitionsLost(Optional.of(new KafkaException("Intentional error
for test")));
+ }
+
+ private void testOnPartitionsLost(Optional<RuntimeException> lostError) {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener(
+ Optional.empty(),
+ Optional.empty(),
+ lostError
+ );
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ membershipManager.transitionToFenced();
+ assertEquals(MemberState.FENCED, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ // Step 3: invoke the callback
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST,
+ topicPartitions("topic1", 0)
+ );
+
+ // Step 4: Receive ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.JOINING, membershipManager.state());
+
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(1, listener.lostCounter.get());
+ }
+
+ private ConsumerRebalanceListenerInvoker
consumerRebalanceListenerInvoker() {
+ ConsumerCoordinatorMetrics coordinatorMetrics = new
ConsumerCoordinatorMetrics(
+ subscriptionState,
+ new Metrics(),
+ "test-");
+ return new ConsumerRebalanceListenerInvoker(
+ new LogContext(),
+ subscriptionState,
+ new MockTime(1),
+ coordinatorMetrics
+ );
+ }
+
+ private SortedSet<TopicPartition> topicPartitions(String topicName, int...
partitions) {
+ SortedSet<TopicPartition> topicPartitions = new TreeSet<>(new
Utils.TopicPartitionComparator());
+
+ for (int partition : partitions)
+ topicPartitions.add(new TopicPartition(topicName, partition));
+
+ return topicPartitions;
+ }
+
+ private SortedSet<TopicIdPartition> topicIdPartitions(Uuid topicId, String
topicName, int... partitions) {
+ SortedSet<TopicIdPartition> topicIdPartitions = new TreeSet<>(new
Utils.TopicIdPartitionComparator());
+
+ for (int partition : partitions)
+ topicIdPartitions.add(new TopicIdPartition(topicId, new
TopicPartition(topicName, partition)));
+
+ return topicIdPartitions;
+ }
+
+ private void performCallback(MembershipManagerImpl membershipManager,
+ ConsumerRebalanceListenerInvoker invoker,
+ ConsumerRebalanceListenerMethodName
expectedMethodName,
+ SortedSet<TopicPartition> expectedPartitions)
{
+ // We expect only our enqueued event in the background queue.
+ assertEquals(1, backgroundEventQueue.size());
+ assertNotNull(backgroundEventQueue.peek());
+ assertInstanceOf(ConsumerRebalanceListenerCallbackNeededEvent.class,
backgroundEventQueue.peek());
+ ConsumerRebalanceListenerCallbackNeededEvent neededEvent =
(ConsumerRebalanceListenerCallbackNeededEvent) backgroundEventQueue.poll();
+ assertNotNull(neededEvent);
+ assertEquals(expectedMethodName, neededEvent.methodName());
+ assertEquals(expectedPartitions, neededEvent.partitions());
+
+ ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent =
invokeRebalanceCallbacks(
+ invoker,
+ neededEvent.methodName(),
+ neededEvent.partitions(),
+ neededEvent.future()
+ );
Review Comment:
Sorry for adding to the confusion 😞
Yes, I could just invoke the method on the `ConsumerRebalanceListener`
instance directly, but by structuring the test this way, it exercises the
following production code to help ensure correctness:
* `AsyncKafkaConsumer.invokeRebalanceCallbacks()` tests the correctness of
the relevant `ConsumerRebalanceListenerMethodName`, error, and event handling
logic
* `ConsumerRebalancerListenerInvoker.invokePartitions*` tests the
correctness of the relevant `ConsumerRebalancerListener`, `SubscriptionState`,
`Optional`, metrics, and log handling logic
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -790,6 +791,297 @@ public void
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
verify(membershipManager, never()).transitionToJoining();
}
+ @Test
+ public void testListenerCallbacksBasic() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener();
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+ String topicName = "topic1";
+ Uuid topicId = Uuid.randomUuid();
+
+
when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
+ when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
+
+ // Step 2: put the state machine into the appropriate... state
+
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
+ receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: assign partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ topicPartitions(topicName, 0, 1)
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ // Step 4: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.STABLE, membershipManager.state());
+ assertEquals(topicIdPartitions(topicId, topicName, 0, 1),
membershipManager.currentAssignment());
+
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(1, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ // Step 5: receive an empty assignment, which means we should call
revoke
+
when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName,
0, 1));
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 6: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions(topicName, 0, 1)
+ );
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 7: assign partitions should still be called, even though it's
empty
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ Collections.emptySortedSet()
+ );
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ // Step 8: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.STABLE, membershipManager.state());
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ assertEquals(1, listener.revokedCounter.get());
+ assertEquals(2, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+ }
+
+ // TODO: Reconciliation needs to support when a listener throws an error
on onPartitionsRevoked(). When that
+ // happens, the assignment step is skipped, which means
onPartitionsAssigned() is never run.
+ // The jury is out on whether or not this is a bug or intentional.
+ //
+ // See
https://github.com/apache/kafka/pull/14640#discussion_r1421253120 for more
details.
+ // @Test
+ public void testListenerCallbacksThrowsErrorOnPartitionsRevoked() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ mockOwnedPartition("topic1", 0);
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener(
+ Optional.of(new IllegalArgumentException("Intentional
onPartitionsRevoked() error")),
+ Optional.empty(),
+ Optional.empty()
+ );
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions("topic1", 0)
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ // Step 4: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ assertEquals(1, listener.revokedCounter.get());
+ assertEquals(1, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+ }
+
+ @Test
+ public void testListenerCallbacksThrowsErrorOnPartitionsAssigned() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ mockOwnedPartition("topic1", 0);
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener(
+ Optional.empty(),
+ Optional.of(new IllegalArgumentException("Intentional
onPartitionsAssigned() error")),
+ Optional.empty()
+ );
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions("topic1", 0)
+ );
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 4: assign partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ Collections.emptySortedSet()
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ // Step 5: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ assertEquals(1, listener.revokedCounter.get());
+ assertEquals(1, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+ }
+
+ @Test
+ public void testOnPartitionsLostNoError() {
+ mockOwnedPartition("topic1", 0);
+ testOnPartitionsLost(Optional.empty());
+ }
+
+ @Test
+ public void testOnPartitionsLostError() {
+ mockOwnedPartition("topic1", 0);
+ testOnPartitionsLost(Optional.of(new KafkaException("Intentional error
for test")));
+ }
+
+ private void testOnPartitionsLost(Optional<RuntimeException> lostError) {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener(
+ Optional.empty(),
+ Optional.empty(),
+ lostError
+ );
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ membershipManager.transitionToFenced();
+ assertEquals(MemberState.FENCED, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ // Step 3: invoke the callback
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST,
+ topicPartitions("topic1", 0)
+ );
+
+ // Step 4: Receive ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.JOINING, membershipManager.state());
+
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(1, listener.lostCounter.get());
+ }
+
+ private ConsumerRebalanceListenerInvoker
consumerRebalanceListenerInvoker() {
+ ConsumerCoordinatorMetrics coordinatorMetrics = new
ConsumerCoordinatorMetrics(
+ subscriptionState,
+ new Metrics(),
+ "test-");
+ return new ConsumerRebalanceListenerInvoker(
+ new LogContext(),
+ subscriptionState,
+ new MockTime(1),
+ coordinatorMetrics
+ );
+ }
+
+ private SortedSet<TopicPartition> topicPartitions(String topicName, int...
partitions) {
+ SortedSet<TopicPartition> topicPartitions = new TreeSet<>(new
Utils.TopicPartitionComparator());
+
+ for (int partition : partitions)
+ topicPartitions.add(new TopicPartition(topicName, partition));
+
+ return topicPartitions;
+ }
+
+ private SortedSet<TopicIdPartition> topicIdPartitions(Uuid topicId, String
topicName, int... partitions) {
+ SortedSet<TopicIdPartition> topicIdPartitions = new TreeSet<>(new
Utils.TopicIdPartitionComparator());
+
+ for (int partition : partitions)
+ topicIdPartitions.add(new TopicIdPartition(topicId, new
TopicPartition(topicName, partition)));
+
+ return topicIdPartitions;
+ }
+
+ private void performCallback(MembershipManagerImpl membershipManager,
+ ConsumerRebalanceListenerInvoker invoker,
+ ConsumerRebalanceListenerMethodName
expectedMethodName,
+ SortedSet<TopicPartition> expectedPartitions)
{
+ // We expect only our enqueued event in the background queue.
+ assertEquals(1, backgroundEventQueue.size());
+ assertNotNull(backgroundEventQueue.peek());
+ assertInstanceOf(ConsumerRebalanceListenerCallbackNeededEvent.class,
backgroundEventQueue.peek());
+ ConsumerRebalanceListenerCallbackNeededEvent neededEvent =
(ConsumerRebalanceListenerCallbackNeededEvent) backgroundEventQueue.poll();
+ assertNotNull(neededEvent);
+ assertEquals(expectedMethodName, neededEvent.methodName());
+ assertEquals(expectedPartitions, neededEvent.partitions());
+
+ ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent =
invokeRebalanceCallbacks(
+ invoker,
+ neededEvent.methodName(),
+ neededEvent.partitions(),
+ neededEvent.future()
+ );
Review Comment:
Sorry for adding to the confusion 😞
Yes, I could just invoke the method on the `ConsumerRebalanceListener`
instance directly, but by structuring the test this way, it exercises the
following production code to help ensure correctness:
* `AsyncKafkaConsumer.invokeRebalanceCallbacks()` tests the correctness of
the relevant `ConsumerRebalanceListenerMethodName`, error, and event handling
logic
* `ConsumerRebalancerListenerInvoker.invokePartitions*` tests the
correctness of the relevant `ConsumerRebalancerListener`, `SubscriptionState`,
`Optional`, metrics, and log handling logic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]