dajac commented on code in PR #14640:
URL: https://github.com/apache/kafka/pull/14640#discussion_r1422136734
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1499,6 +1575,149 @@ private void subscribeInternal(Collection<String>
topics, Optional<ConsumerRebal
}
}
+ /**
+ * This method can be used by cases where the caller has an event that
needs to both block for completion but
+ * also process background events. For some events, in order to fully
process the associated logic, the
+ * {@link ConsumerNetworkThread background thread} needs assistance from
the application thread to complete.
+ * If the application thread simply blocked on the event after submitting
it, the processing would deadlock.
+ * The logic herein is basically a loop that performs two tasks in each
iteration:
+ *
+ * <ol>
+ * <li>Process background events, if any</li>
+ * <li><em>Briefly</em> wait for {@link CompletableApplicationEvent an
event} to complete</li>
+ * </ol>
+ *
+ * <p/>
+ *
+ * Each iteration gives the application thread an opportunity to process
background events, which may be
+ * necessary to complete the overall processing.
+ *
+ * <p/>
+ *
+ * As an example, take {@link #unsubscribe()}. To start unsubscribing, the
application thread enqueues an
+ * {@link UnsubscribeApplicationEvent} on the application event queue.
That event will eventually trigger the
+ * rebalancing logic in the background thread. Critically, as part of this
rebalancing work, the
+ * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}
callback needs to be invoked. However,
+ * this callback must be executed on the application thread. To achieve
this, the background thread enqueues a
+ * {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background
event queue. That event queue is
+ * periodically queried by the application thread to see if there's work
to be done. When the application thread
+ * sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is
processed, and then a
+ * {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then
enqueued by the application thread on the
+ * background event queue. Moments later, the background thread will see
that event, process it, and continue
+ * execution of the rebalancing logic. The rebalancing logic cannot
complete until the
+ * {@link ConsumerRebalanceListener} callback is performed.
+ *
+ * @param eventProcessor Event processor that contains the queue of events
to process
+ * @param future Event that contains a {@link CompletableFuture};
it is on this future that the
+ * application thread will wait for completion
+ * @param timer Overall timer that bounds how long to wait for
the event to complete
+ * @return {@code true} if the event completed within the timeout, {@code
false} otherwise
+ */
+ // Visible for testing
+ <T> T processBackgroundEvents(EventProcessor<?> eventProcessor,
+ Future<T> future,
+ Timer timer) {
+ log.trace("Will wait up to {} ms for future {} to complete",
timer.remainingMs(), future);
+
+ do {
+ boolean hadEvents = eventProcessor.process();
+
+ try {
+ if (future.isDone()) {
+ // If the event is done (either successfully or
otherwise), go ahead and attempt to return
+ // without waiting. We use the ConsumerUtils.getResult()
method here to handle the conversion
+ // of the exception types.
+ T result = ConsumerUtils.getResult(future);
+ log.trace("Future {} completed successfully", future);
+ return result;
+ } else if (!hadEvents) {
+ // If the above processing yielded no events, then let's
sit tight for a bit to allow the
+ // background thread to either a) finish the task, or b)
populate the background event
+ // queue with things to process in our next loop.
+ Timer pollInterval = time.timer(100L);
+ log.trace("Waiting {} ms for future {} to complete",
pollInterval.remainingMs(), future);
+ T result = ConsumerUtils.getResult(future, pollInterval);
+ log.trace("Future {} completed successfully", future);
+ return result;
+ }
+ } catch (TimeoutException e) {
+ // Ignore this as we will retry the event until the timeout
expires.
+ } finally {
+ timer.update();
+ }
+ } while (timer.notExpired());
+
+ log.trace("Future {} did not complete within timeout", future);
+ throw new TimeoutException("Operation timed out before completion");
+ }
+
+ void updateConsumerGroupMetadata(String newMemberId, int newMemberEpoch) {
Review Comment:
This is new and it seems completely unrelated to the PR. Is there a reason
why we are doing it here? It is usually not recommended to hijack PR with
unrelated changes like this.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java:
##########
@@ -224,4 +224,21 @@ else if (t instanceof KafkaException)
throw new TimeoutException(e);
}
}
+
+ public static <T> T getResult(Future<T> future) {
+ try {
+ return future.get();
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+
+ if (t instanceof WakeupException)
Review Comment:
`WakeupException` is a `KafkaException`. Is there a reason why we cannot
rely on `if (t instanceof KafkaException)` for it? Assuming that we could
remove it, it may be worth having a `maybeWrap` helper method which would wrap
any non KafkaException. Suggesting this because we could reuse it in the other
`getResult` too.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -180,12 +179,12 @@ private void process(final ListOffsetsApplicationEvent
event) {
* consumer join the group if it is not part of it yet, or send the
updated subscription if
* it is already a member.
*/
- private void process(final SubscriptionChangeApplicationEvent event) {
- if (!requestManagers.membershipManager.isPresent()) {
- throw new RuntimeException("Group membership manager not present
when processing a " +
- "subscribe event");
+ private void process(final SubscriptionChangeApplicationEvent ignored) {
+ if (!requestManagers.heartbeatRequestManager.isPresent()) {
Review Comment:
Good point. How about renaming `RequestManagers` to `Managers`? Anyway,
let's tackle it separately.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -790,6 +791,297 @@ public void
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
verify(membershipManager, never()).transitionToJoining();
}
+ @Test
+ public void testListenerCallbacksBasic() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener();
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+ String topicName = "topic1";
+ Uuid topicId = Uuid.randomUuid();
+
+
when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
+ when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
+
+ // Step 2: put the state machine into the appropriate... state
+
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
+ receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: assign partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ topicPartitions(topicName, 0, 1)
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ // Step 4: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.STABLE, membershipManager.state());
+ assertEquals(topicIdPartitions(topicId, topicName, 0, 1),
membershipManager.currentAssignment());
+
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(1, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ // Step 5: receive an empty assignment, which means we should call
revoke
+
when(subscriptionState.assignedPartitions()).thenReturn(topicPartitions(topicName,
0, 1));
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 6: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions(topicName, 0, 1)
+ );
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 7: assign partitions should still be called, even though it's
empty
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ Collections.emptySortedSet()
+ );
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ // Step 8: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.STABLE, membershipManager.state());
+ assertFalse(membershipManager.reconciliationInProgress());
+
+ assertEquals(1, listener.revokedCounter.get());
+ assertEquals(2, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+ }
+
+ // TODO: Reconciliation needs to support when a listener throws an error
on onPartitionsRevoked(). When that
+ // happens, the assignment step is skipped, which means
onPartitionsAssigned() is never run.
+ // The jury is out on whether or not this is a bug or intentional.
+ //
+ // See
https://github.com/apache/kafka/pull/14640#discussion_r1421253120 for more
details.
+ // @Test
+ public void testListenerCallbacksThrowsErrorOnPartitionsRevoked() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ mockOwnedPartition("topic1", 0);
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener(
+ Optional.of(new IllegalArgumentException("Intentional
onPartitionsRevoked() error")),
+ Optional.empty(),
+ Optional.empty()
+ );
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions("topic1", 0)
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ // Step 4: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ assertEquals(1, listener.revokedCounter.get());
+ assertEquals(1, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+ }
+
+ @Test
+ public void testListenerCallbacksThrowsErrorOnPartitionsAssigned() {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ mockOwnedPartition("topic1", 0);
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener(
+ Optional.empty(),
+ Optional.of(new IllegalArgumentException("Intentional
onPartitionsAssigned() error")),
+ Optional.empty()
+ );
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ receiveEmptyAssignment(membershipManager);
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertTrue(membershipManager.reconciliationInProgress());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 3: revoke partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED,
+ topicPartitions("topic1", 0)
+ );
+
+ assertTrue(membershipManager.reconciliationInProgress());
+
+ // Step 4: assign partitions
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ Collections.emptySortedSet()
+ );
+
+ assertFalse(membershipManager.reconciliationInProgress());
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ // Step 5: Send ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ assertEquals(1, listener.revokedCounter.get());
+ assertEquals(1, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+ }
+
+ @Test
+ public void testOnPartitionsLostNoError() {
+ mockOwnedPartition("topic1", 0);
+ testOnPartitionsLost(Optional.empty());
+ }
+
+ @Test
+ public void testOnPartitionsLostError() {
+ mockOwnedPartition("topic1", 0);
+ testOnPartitionsLost(Optional.of(new KafkaException("Intentional error
for test")));
+ }
+
+ private void testOnPartitionsLost(Optional<RuntimeException> lostError) {
+ // Step 1: set up mocks
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener(
+ Optional.empty(),
+ Optional.empty(),
+ lostError
+ );
+ ConsumerRebalanceListenerInvoker invoker =
consumerRebalanceListenerInvoker();
+
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+ doNothing().when(subscriptionState).markPendingRevocation(anySet());
+
+ // Step 2: put the state machine into the appropriate... state
+ membershipManager.transitionToFenced();
+ assertEquals(MemberState.FENCED, membershipManager.state());
+ assertEquals(Collections.emptySet(),
membershipManager.currentAssignment());
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(0, listener.lostCounter.get());
+
+ // Step 3: invoke the callback
+ performCallback(
+ membershipManager,
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST,
+ topicPartitions("topic1", 0)
+ );
+
+ // Step 4: Receive ack and make sure we're done and our listener was
called appropriately
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.JOINING, membershipManager.state());
+
+ assertEquals(0, listener.revokedCounter.get());
+ assertEquals(0, listener.assignedCounter.get());
+ assertEquals(1, listener.lostCounter.get());
+ }
+
+ private ConsumerRebalanceListenerInvoker
consumerRebalanceListenerInvoker() {
+ ConsumerCoordinatorMetrics coordinatorMetrics = new
ConsumerCoordinatorMetrics(
+ subscriptionState,
+ new Metrics(),
+ "test-");
+ return new ConsumerRebalanceListenerInvoker(
+ new LogContext(),
+ subscriptionState,
+ new MockTime(1),
+ coordinatorMetrics
+ );
+ }
+
+ private SortedSet<TopicPartition> topicPartitions(String topicName, int...
partitions) {
+ SortedSet<TopicPartition> topicPartitions = new TreeSet<>(new
Utils.TopicPartitionComparator());
+
+ for (int partition : partitions)
+ topicPartitions.add(new TopicPartition(topicName, partition));
+
+ return topicPartitions;
+ }
+
+ private SortedSet<TopicIdPartition> topicIdPartitions(Uuid topicId, String
topicName, int... partitions) {
+ SortedSet<TopicIdPartition> topicIdPartitions = new TreeSet<>(new
Utils.TopicIdPartitionComparator());
+
+ for (int partition : partitions)
+ topicIdPartitions.add(new TopicIdPartition(topicId, new
TopicPartition(topicName, partition)));
+
+ return topicIdPartitions;
+ }
+
+ private void performCallback(MembershipManagerImpl membershipManager,
+ ConsumerRebalanceListenerInvoker invoker,
+ ConsumerRebalanceListenerMethodName
expectedMethodName,
+ SortedSet<TopicPartition> expectedPartitions)
{
+ // We expect only our enqueued event in the background queue.
+ assertEquals(1, backgroundEventQueue.size());
+ assertNotNull(backgroundEventQueue.peek());
+ assertInstanceOf(ConsumerRebalanceListenerCallbackNeededEvent.class,
backgroundEventQueue.peek());
+ ConsumerRebalanceListenerCallbackNeededEvent neededEvent =
(ConsumerRebalanceListenerCallbackNeededEvent) backgroundEventQueue.poll();
+ assertNotNull(neededEvent);
+ assertEquals(expectedMethodName, neededEvent.methodName());
+ assertEquals(expectedPartitions, neededEvent.partitions());
+
+ ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent =
invokeRebalanceCallbacks(
+ invoker,
+ neededEvent.methodName(),
+ neededEvent.partitions(),
+ neededEvent.future()
+ );
Review Comment:
I am sorry but I still don't get why we really need to call the invoker
here. It seems simpler to just create the event:
```
return new ConsumerRebalanceListenerCallbackCompletedEvent(
methodName,
future,
error
);
```
We would need to pass `error` to `performCallback`. If we do this, we could
remove all the listener's related bits from all the tests and
`CounterConsumerRebalanceListener`. I may be missing something...
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -790,6 +812,197 @@ public void
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
verify(membershipManager, never()).transitionToJoining();
}
+ @Test
+ public void testListenerCallbacksBasic() {
Review Comment:
Hum... @lianetm In this case, it seems that the reconciliation is not failed
when only `onPartitionsRevoked` fails. Is this right? It may be better to just
fail the reconciliation when `onPartitionsRevoked` fails. What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]