lianetm commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1838719823


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -230,6 +234,10 @@ private ClientRequest makeClientRequest(
             unsent.handler
         );
     }
+    
+    public CompletableFuture<RuntimeException> metaDataError() {

Review Comment:
   typo metadataError



##########
core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala:
##########
@@ -574,7 +574,7 @@ object QuorumTestHarness {
   // The following parameter groups are to *temporarily* avoid bugs with the 
CONSUMER group protocol Consumer
   // implementation that would otherwise cause tests to fail.
   def 
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_16176: 
stream.Stream[Arguments] = 
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
-  def 
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696: 
stream.Stream[Arguments] = 
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
+  def 
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696: 
stream.Stream[Arguments] = 
getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly

Review Comment:
   this 
"getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696" 
was meant to be temporary, so instead of this we should change the tests that 
are using 
`@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))`
 to use `@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))`, 
and remove this line. Could you try that to see where we are regarding 
failures? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##########
@@ -256,12 +257,27 @@ public CompletableFuture<Boolean> 
updateFetchPositions(long deadlineMs) {
                 }
             });
 
+            onMetadataError(metadataError, result);
+
         } catch (Exception e) {
             result.completeExceptionally(maybeWrapAsKafkaException(e));
         }
         return result;
     }
 
+    private void onMetadataError(CompletableFuture<RuntimeException> 
metadataError, 
+                                 CompletableFuture<Boolean> result) {
+        metadataError.whenComplete((__, error) -> {
+            if (error instanceof AuthorizationException && 
pendingOffsetFetchEvent != null) {

Review Comment:
   With this change we're propagating the metadata error to fail the pending 
`OffsetFetch` request. But there could be other requests in this manager that 
need to be aware of the metadata error too I expect, ex. `ListOffsets` (that 
would be the case of any api call to 
beginningOffsets/endOffsets/offsetsForTimes without topic access).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to