kirktrue commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1424283196
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -790,6 +791,297 @@ public void
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
verify(membershipManager, never()).transitionToJoining();
}
+ @Test
+ public void testListenerCallbacksBasic() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener();
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+ String topicName = "topic1";
+ Uuid topicId = Uuid.randomUuid();
+
+
when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
+ when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
+
+ // Step 2: put the state machine into the appropriate... state
+
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
+ receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: assign partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ topicPartitions(topicName, 0, 1)
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ // Step 4: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.STABLE, membershipManager.state());
+ assertEquals(topicIdPartitions(topicId, topicName, 0, 1),
membershipManager.currentAssignment());
+
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(1, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ // Step 5: receive an empty assignment, which means we should call
revoke
+
when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName,
0, 1));
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 6: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions(topicName, 0, 1)
+ );
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 7: assign partitions should still be called, even though it's
empty
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ Collections.emptySortedSet()
+ );
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ // Step 8: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.STABLE, membershipManager.state());
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ assertEquals(1, listener.revokedCounter.get());
+ assertEquals(2, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+ }
+
+ // TODO: Reconciliation needs to support when a listener throws an error
on onPartitionsRevoked(). When that
+ // happens, the assignment step is skipped, which means
onPartitionsAssigned() is never run.
+ // The jury is out on whether or not this is a bug or intentional.
+ //
+ // See
https://github.com/apache/kafka/pull/14640#discussion_r1421253120 for more
details.
+ // @Test
+ public void testListenerCallbacksThrowsErrorOnPartitionsRevoked() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ mockOwnedPartition("topic1", 0);
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener(
+ Optional.of(new IllegalArgumentException("Intentional
onPartitionsRevoked() error")),
+ Optional.empty(),
+ Optional.empty()
+ );
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions("topic1", 0)
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ // Step 4: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ assertEquals(1, listener.revokedCounter.get());
+ assertEquals(1, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+ }
+
+ @Test
+ public void testListenerCallbacksThrowsErrorOnPartitionsAssigned() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ mockOwnedPartition("topic1", 0);
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener(
+ Optional.empty(),
+ Optional.of(new IllegalArgumentException("Intentional
onPartitionsAssigned() error")),
+ Optional.empty()
+ );
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions("topic1", 0)
+ );
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 4: assign partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ Collections.emptySortedSet()
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ // Step 5: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ assertEquals(1, listener.revokedCounter.get());
+ assertEquals(1, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+ }
+
+ @Test
+ public void testOnPartitionsLostNoError() {
+ mockOwnedPartition("topic1", 0);
+ testOnPartitionsLost(Optional.empty());
+ }
+
+ @Test
+ public void testOnPartitionsLostError() {
+ mockOwnedPartition("topic1", 0);
+ testOnPartitionsLost(Optional.of(new KafkaException("Intentional error
for test")));
+ }
+
+ private void testOnPartitionsLost(Optional<RuntimeException> lostError) {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener(
+ Optional.empty(),
+ Optional.empty(),
+ lostError
+ );
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ membershipManager.transitionToFenced();
+ assertEquals(MemberState.FENCED, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ // Step 3: invoke the callback
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST,
+ topicPartitions("topic1", 0)
+ );
+
+ // Step 4: Receive ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.JOINING, membershipManager.state());
+
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(1, listener.lostCounter.get());
+ }
+
+ private ConsumerRebalanceListenerInvoker
consumerRebalanceListenerInvoker() {
+ ConsumerCoordinatorMetrics coordinatorMetrics = new
ConsumerCoordinatorMetrics(
+ subscriptionState,
+ new Metrics(),
+ "test-");
+ return new ConsumerRebalanceListenerInvoker(
+ new LogContext(),
+ subscriptionState,
+ new MockTime(1),
+ coordinatorMetrics
+ );
+ }
+
+ private SortedSet<TopicPartition> topicPartitions(String topicName, int...
partitions) {
+ SortedSet<TopicPartition> topicPartitions = new TreeSet<>(new
Utils.TopicPartitionComparator());
+
+ for (int partition : partitions)
+ topicPartitions.add(new TopicPartition(topicName, partition));
+
+ return topicPartitions;
+ }
+
+ private SortedSet<TopicIdPartition> topicIdPartitions(Uuid topicId, String
topicName, int... partitions) {
+ SortedSet<TopicIdPartition> topicIdPartitions = new TreeSet<>(new
Utils.TopicIdPartitionComparator());
+
+ for (int partition : partitions)
+ topicIdPartitions.add(new TopicIdPartition(topicId, new
TopicPartition(topicName, partition)));
+
+ return topicIdPartitions;
+ }
+
+ private void performCallback(MembershipManagerImpl membershipManager,
+ ConsumerRebalanceListenerInvoker invoker,
+ ConsumerRebalanceListenerMethodName
expectedMethodName,
+ SortedSet<TopicPartition> expectedPartitions)
{
+ // We expect only our enqueued event in the background queue.
+ assertEquals(1, backgroundEventQueue.size());
+ assertNotNull(backgroundEventQueue.peek());
+ assertInstanceOf(ConsumerRebalanceListenerCallbackNeededEvent.class,
backgroundEventQueue.peek());
+ ConsumerRebalanceListenerCallbackNeededEvent neededEvent =
(ConsumerRebalanceListenerCallbackNeededEvent) backgroundEventQueue.poll();
+ assertNotNull(neededEvent);
+ assertEquals(expectedMethodName, neededEvent.methodName());
+ assertEquals(expectedPartitions, neededEvent.partitions());
+
+ ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent =
invokeRebalanceCallbacks(
+ invoker,
+ neededEvent.methodName(),
+ neededEvent.partitions(),
+ neededEvent.future()
+ );
Review Comment:
Agreed. The boundaries and scope of the unit and integration tests isn't
consistent. As the codebase has grown, we haven't been as careful as we should
to maintain ease of unit testing 😞
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -790,6 +791,297 @@ public void
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
verify(membershipManager, never()).transitionToJoining();
}
+ @Test
+ public void testListenerCallbacksBasic() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener();
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+ String topicName = "topic1";
+ Uuid topicId = Uuid.randomUuid();
+
+
when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
+ when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
+
+ // Step 2: put the state machine into the appropriate... state
+
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
+ receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: assign partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ topicPartitions(topicName, 0, 1)
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ // Step 4: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.STABLE, membershipManager.state());
+ assertEquals(topicIdPartitions(topicId, topicName, 0, 1),
membershipManager.currentAssignment());
+
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(1, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ // Step 5: receive an empty assignment, which means we should call
revoke
+
when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName,
0, 1));
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 6: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions(topicName, 0, 1)
+ );
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 7: assign partitions should still be called, even though it's
empty
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ Collections.emptySortedSet()
+ );
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ // Step 8: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.STABLE, membershipManager.state());
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ assertEquals(1, listener.revokedCounter.get());
+ assertEquals(2, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+ }
+
+ // TODO: Reconciliation needs to support when a listener throws an error
on onPartitionsRevoked(). When that
+ // happens, the assignment step is skipped, which means
onPartitionsAssigned() is never run.
+ // The jury is out on whether or not this is a bug or intentional.
+ //
+ // See
https://github.com/apache/kafka/pull/14640#discussion_r1421253120 for more
details.
+ // @Test
+ public void testListenerCallbacksThrowsErrorOnPartitionsRevoked() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ mockOwnedPartition("topic1", 0);
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener(
+ Optional.of(new IllegalArgumentException("Intentional
onPartitionsRevoked() error")),
+ Optional.empty(),
+ Optional.empty()
+ );
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions("topic1", 0)
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ // Step 4: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ assertEquals(1, listener.revokedCounter.get());
+ assertEquals(1, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+ }
+
+ @Test
+ public void testListenerCallbacksThrowsErrorOnPartitionsAssigned() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ mockOwnedPartition("topic1", 0);
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener(
+ Optional.empty(),
+ Optional.of(new IllegalArgumentException("Intentional
onPartitionsAssigned() error")),
+ Optional.empty()
+ );
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions("topic1", 0)
+ );
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 4: assign partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ Collections.emptySortedSet()
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ // Step 5: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ assertEquals(1, listener.revokedCounter.get());
+ assertEquals(1, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+ }
+
+ @Test
+ public void testOnPartitionsLostNoError() {
+ mockOwnedPartition("topic1", 0);
+ testOnPartitionsLost(Optional.empty());
+ }
+
+ @Test
+ public void testOnPartitionsLostError() {
+ mockOwnedPartition("topic1", 0);
+ testOnPartitionsLost(Optional.of(new KafkaException("Intentional error
for test")));
+ }
+
+ private void testOnPartitionsLost(Optional<RuntimeException> lostError) {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener(
+ Optional.empty(),
+ Optional.empty(),
+ lostError
+ );
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ membershipManager.transitionToFenced();
+ assertEquals(MemberState.FENCED, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ // Step 3: invoke the callback
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST,
+ topicPartitions("topic1", 0)
+ );
+
+ // Step 4: Receive ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.JOINING, membershipManager.state());
+
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(1, listener.lostCounter.get());
+ }
+
+ private ConsumerRebalanceListenerInvoker
consumerRebalanceListenerInvoker() {
+ ConsumerCoordinatorMetrics coordinatorMetrics = new
ConsumerCoordinatorMetrics(
+ subscriptionState,
+ new Metrics(),
+ "test-");
+ return new ConsumerRebalanceListenerInvoker(
+ new LogContext(),
+ subscriptionState,
+ new MockTime(1),
+ coordinatorMetrics
+ );
+ }
+
+ private SortedSet<TopicPartition> topicPartitions(String topicName, int...
partitions) {
+ SortedSet<TopicPartition> topicPartitions = new TreeSet<>(new
Utils.TopicPartitionComparator());
+
+ for (int partition : partitions)
+ topicPartitions.add(new TopicPartition(topicName, partition));
+
+ return topicPartitions;
+ }
+
+ private SortedSet<TopicIdPartition> topicIdPartitions(Uuid topicId, String
topicName, int... partitions) {
+ SortedSet<TopicIdPartition> topicIdPartitions = new TreeSet<>(new
Utils.TopicIdPartitionComparator());
+
+ for (int partition : partitions)
+ topicIdPartitions.add(new TopicIdPartition(topicId, new
TopicPartition(topicName, partition)));
+
+ return topicIdPartitions;
+ }
+
+ private void performCallback(MembershipManagerImpl membershipManager,
+ ConsumerRebalanceListenerInvoker invoker,
+ ConsumerRebalanceListenerMethodName
expectedMethodName,
+ SortedSet<TopicPartition> expectedPartitions)
{
+ // We expect only our enqueued event in the background queue.
+ assertEquals(1, backgroundEventQueue.size());
+ assertNotNull(backgroundEventQueue.peek());
+ assertInstanceOf(ConsumerRebalanceListenerCallbackNeededEvent.class,
backgroundEventQueue.peek());
+ ConsumerRebalanceListenerCallbackNeededEvent neededEvent =
(ConsumerRebalanceListenerCallbackNeededEvent) backgroundEventQueue.poll();
+ assertNotNull(neededEvent);
+ assertEquals(expectedMethodName, neededEvent.methodName());
+ assertEquals(expectedPartitions, neededEvent.partitions());
+
+ ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent =
invokeRebalanceCallbacks(
+ invoker,
+ neededEvent.methodName(),
+ neededEvent.partitions(),
+ neededEvent.future()
+ );
Review Comment:
Agreed. The boundaries and scope of the unit and integration tests isn't
consistent. As the codebase has grown, we haven't been as careful as we should
to maintain ease of unit testing 😞
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]