lianetm commented on code in PR #17440: URL: https://github.com/apache/kafka/pull/17440#discussion_r1838719823
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ########## @@ -230,6 +234,10 @@ private ClientRequest makeClientRequest( unsent.handler ); } + + public CompletableFuture<RuntimeException> metaDataError() { Review Comment: typo metadataError ########## core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala: ########## @@ -574,7 +574,7 @@ object QuorumTestHarness { // The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer // implementation that would otherwise cause tests to fail. def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_16176: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly - def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly + def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly Review Comment: this "getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696" was meant to be temporary, so instead of this we should change the tests that are using `@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))` to use `@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))`, and remove this line. Could you try that to see where we are regarding failures? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ########## @@ -256,12 +257,27 @@ public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) { } }); + onMetadataError(metadataError, result); + } catch (Exception e) { result.completeExceptionally(maybeWrapAsKafkaException(e)); } return result; } + private void onMetadataError(CompletableFuture<RuntimeException> metadataError, + CompletableFuture<Boolean> result) { + metadataError.whenComplete((__, error) -> { + if (error instanceof AuthorizationException && pendingOffsetFetchEvent != null) { Review Comment: With this change we're propagating the metadata error to fail the pending `OffsetFetch` request. But there could be other requests in this manager that need to be aware of the metadata error too I expect, ex. `ListOffsets` (that would be the case of any api call to beginningOffsets/endOffsets/offsetsForTimes without topic access). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org