HeartSaVioR commented on pull request #29272:
URL: https://github.com/apache/spark/pull/29272#issuecomment-668333483
I have been getting reports for flaky tests based on this - I took a closer
look on the explanation of Kafka configurations, and realized they're not
interchangeable. My bad.
This test (in Kafka) perfectly describes how these configurations work
together (I'm wondering why `default.api.timeout.ms` is used and it just passes
the value into timeout parameter, but nothing changed to show the behavior):
```
@Test
public void testSuccessfulRetryAfterRequestTimeout() throws Exception {
HashMap<Integer, Node> nodes = new HashMap<>();
MockTime time = new MockTime();
Node node0 = new Node(0, "localhost", 8121);
nodes.put(0, node0);
Cluster cluster = new Cluster("mockClusterId", nodes.values(),
Arrays.asList(new PartitionInfo("foo", 0, node0, new
Node[]{node0}, new Node[]{node0})),
Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), nodes.get(0));
final int requestTimeoutMs = 1000;
final int retryBackoffMs = 100;
final int apiTimeoutMs = 3000;
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
cluster,
AdminClientConfig.RETRY_BACKOFF_MS_CONFIG,
String.valueOf(retryBackoffMs),
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,
String.valueOf(requestTimeoutMs))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
final ListTopicsResult result = env.adminClient()
.listTopics(new
ListTopicsOptions().timeoutMs(apiTimeoutMs));
// Wait until the first attempt has been sent, then advance the
time
TestUtils.waitForCondition(() ->
env.kafkaClient().hasInFlightRequests(),
"Timed out waiting for Metadata request to be sent");
time.sleep(requestTimeoutMs + 1);
// Wait for the request to be timed out before backing off
TestUtils.waitForCondition(() ->
!env.kafkaClient().hasInFlightRequests(),
"Timed out waiting for inFlightRequests to be timed
out");
time.sleep(retryBackoffMs);
// Since api timeout bound is not hit, AdminClient should retry
TestUtils.waitForCondition(() ->
env.kafkaClient().hasInFlightRequests(),
"Failed to retry Metadata request");
env.kafkaClient().respond(prepareMetadataResponse(cluster,
Errors.NONE));
assertEquals(1, result.listings().get().size());
assertEquals("foo",
result.listings().get().iterator().next().name());
}
}
```
When we request without timeout parameter, we're now using default value of
`default.api.timeout.ms` which is 60 seconds, longer than test timeout. If the
request can't be retried, then `request.timeout.ms` would make the request fail
in same time (which is intended). But if the request can be retried, it's no
longer the same and it may hold the request until it is timed out via
`default.api.timeout.ms` (which is not expected).
So if the change is intended to make sure `default.api.timeout.ms` >=
`request.timeout.ms` (see
[KAFKA-10318](https://issues.apache.org/jira/browse/KAFKA-10318)), both
configurations should be changed together.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]