AndrewJSchofield commented on code in PR #14962:
URL: https://github.com/apache/kafka/pull/14962#discussion_r1431317971
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##########
@@ -73,79 +71,127 @@ public class TopicMetadataRequestManager implements
RequestManager {
public TopicMetadataRequestManager(final LogContext context, final
ConsumerConfig config) {
logContext = context;
log = logContext.logger(getClass());
- inflightRequests = new HashMap<>();
+ inflightRequests = new LinkedList<>();
retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
retryBackoffMaxMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
allowAutoTopicCreation =
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG);
}
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
- List<NetworkClientDelegate.UnsentRequest> requests =
inflightRequests.values().stream()
+ // Prune any requests which have timed out
+ List<TopicMetadataRequestState> expiredRequests =
inflightRequests.stream()
+ .filter(req -> req.isExpired(currentTimeMs))
+ .collect(Collectors.toList());
+ expiredRequests.forEach(TopicMetadataRequestState::expire);
+
+ List<NetworkClientDelegate.UnsentRequest> requests =
inflightRequests.stream()
.map(req -> req.send(currentTimeMs))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
+
return requests.isEmpty() ? EMPTY : new
NetworkClientDelegate.PollResult(0, requests);
}
/**
- * return the future of the metadata request. Return the existing future
if a request for the same topic is already
- * inflight.
+ * Return the future of the metadata request.
*
- * @param topic to be requested. If empty, return the metadata for all
topics.
* @return the future of the metadata request.
*/
- public CompletableFuture<Map<String, List<PartitionInfo>>>
requestTopicMetadata(final Optional<String> topic) {
- if (inflightRequests.containsKey(topic)) {
- return inflightRequests.get(topic).future;
- }
+ public CompletableFuture<Map<String, List<PartitionInfo>>>
requestAllTopicsMetadata(final long expirationTimeMs) {
+ TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
+ logContext,
+ expirationTimeMs,
+ retryBackoffMs,
+ retryBackoffMaxMs);
+ inflightRequests.add(newRequest);
+ return newRequest.future;
+ }
+ /**
+ * Return the future of the metadata request.
+ *
+ * @param topic to be requested.
+ * @return the future of the metadata request.
+ */
+ public CompletableFuture<Map<String, List<PartitionInfo>>>
requestTopicMetadata(final String topic, final long expirationTimeMs) {
TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
logContext,
topic,
+ expirationTimeMs,
retryBackoffMs,
retryBackoffMaxMs);
- inflightRequests.put(topic, newRequest);
+ inflightRequests.add(newRequest);
return newRequest.future;
}
// Visible for testing
List<TopicMetadataRequestState> inflightRequests() {
- return new ArrayList<>(inflightRequests.values());
+ return inflightRequests;
}
class TopicMetadataRequestState extends RequestState {
- private final Optional<String> topic;
+ private final String topic;
+ private final boolean allTopics;
+ private final long expirationTimeMs;
CompletableFuture<Map<String, List<PartitionInfo>>> future;
public TopicMetadataRequestState(final LogContext logContext,
- final Optional<String> topic,
+ final long expirationTimeMs,
+ final long retryBackoffMs,
+ final long retryBackoffMaxMs) {
+ super(logContext, TopicMetadataRequestState.class.getSimpleName(),
retryBackoffMs,
+ retryBackoffMaxMs);
+ future = new CompletableFuture<>();
+ this.topic = null;
+ this.allTopics = true;
+ this.expirationTimeMs = expirationTimeMs;
+ }
+
+ public TopicMetadataRequestState(final LogContext logContext,
+ final String topic,
+ final long expirationTimeMs,
final long retryBackoffMs,
final long retryBackoffMaxMs) {
super(logContext, TopicMetadataRequestState.class.getSimpleName(),
retryBackoffMs,
retryBackoffMaxMs);
future = new CompletableFuture<>();
this.topic = topic;
+ this.allTopics = false;
+ this.expirationTimeMs = expirationTimeMs;
}
/**
* prepare the metadata request and return an
* {@link
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
if needed.
*/
private Optional<NetworkClientDelegate.UnsentRequest> send(final long
currentTimeMs) {
+ if (currentTimeMs >= expirationTimeMs) {
Review Comment:
If the request waiting for the metadata will have given up waiting, there
didn't seem to me to be any point in sending the request any longer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]