HeartSaVioR commented on pull request #29272:
URL: https://github.com/apache/spark/pull/29272#issuecomment-668333483


   I have been getting reports for flaky tests based on this - I took a closer 
look on the explanation of Kafka configurations, and realized they're not 
interchangeable. My bad.
   
   This test (in Kafka) perfectly describes how these configurations work 
together (I'm wondering why `default.api.timeout.ms` is used and it just passes 
the value into timeout parameter, but nothing changed to show the behavior):
   
   ```
       @Test
       public void testSuccessfulRetryAfterRequestTimeout() throws Exception {
           HashMap<Integer, Node> nodes = new HashMap<>();
           MockTime time = new MockTime();
           Node node0 = new Node(0, "localhost", 8121);
           nodes.put(0, node0);
           Cluster cluster = new Cluster("mockClusterId", nodes.values(),
                   Arrays.asList(new PartitionInfo("foo", 0, node0, new 
Node[]{node0}, new Node[]{node0})),
                   Collections.emptySet(), Collections.emptySet(),
                   Collections.emptySet(), nodes.get(0));
   
           final int requestTimeoutMs = 1000;
           final int retryBackoffMs = 100;
           final int apiTimeoutMs = 3000;
   
           try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster,
                   AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, 
String.valueOf(retryBackoffMs),
                   AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
String.valueOf(requestTimeoutMs))) {
               env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
   
               final ListTopicsResult result = env.adminClient()
                       .listTopics(new 
ListTopicsOptions().timeoutMs(apiTimeoutMs));
   
               // Wait until the first attempt has been sent, then advance the 
time
               TestUtils.waitForCondition(() -> 
env.kafkaClient().hasInFlightRequests(),
                       "Timed out waiting for Metadata request to be sent");
               time.sleep(requestTimeoutMs + 1);
   
               // Wait for the request to be timed out before backing off
               TestUtils.waitForCondition(() -> 
!env.kafkaClient().hasInFlightRequests(),
                       "Timed out waiting for inFlightRequests to be timed 
out");
               time.sleep(retryBackoffMs);
   
               // Since api timeout bound is not hit, AdminClient should retry
               TestUtils.waitForCondition(() -> 
env.kafkaClient().hasInFlightRequests(),
                       "Failed to retry Metadata request");
               env.kafkaClient().respond(prepareMetadataResponse(cluster, 
Errors.NONE));
   
               assertEquals(1, result.listings().get().size());
               assertEquals("foo", 
result.listings().get().iterator().next().name());
           }
       }
   ```
   
   When we request without timeout parameter, we're now using default value of 
`default.api.timeout.ms` which is 60 seconds, longer than test timeout. If the 
request can't be retried, then `request.timeout.ms` would make the request fail 
in same time (which is intended). But if the request can be retried, it's no 
longer the same and it may hold the request until it is timed out via 
`default.api.timeout.ms` (which is not expected).
   
   So if the change is intended to make sure `default.api.timeout.ms` >= 
`request.timeout.ms` (see 
[KAFKA-10318](https://issues.apache.org/jira/browse/KAFKA-10318)), both 
configurations should be changed together.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to