cadonna commented on code in PR #14779:
URL: https://github.com/apache/kafka/pull/14779#discussion_r1395925569
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -112,6 +112,12 @@ public void testInvalidGroupId() {
assertThrows(InvalidGroupIdException.class, () ->
consumer.committed(new HashSet<>()));
}
+ @Test
+ public void testFailOnClosedConsumer() {
+ consumer.close();
+ assertThrows(IllegalStateException.class, consumer::assignment);
Review Comment:
Could you please also verify the exception message?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1068,12 +1168,17 @@ public void onComplete(Map<TopicPartition,
OffsetAndMetadata> offsets, Exception
@Override
public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
- backgroundEventProcessor.process();
+ acquireAndEnsureOpen();
Review Comment:
I assume this is because `updateAssignmentMetadataIfNeeded()` is not part of
the public interface but of the internal `ConsumerDelegate` interface. In
production code it is only used in `poll()` which is already wrapped by the
lock.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]