This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new e501f52 KAFKA-6928: Refactor StreamsPartitionAssignor retry logic (#6085) e501f52 is described below commit e501f5273032839d5e4b580a9fcef4fac0188970 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Fri Jan 4 10:49:08 2019 -0800 KAFKA-6928: Refactor StreamsPartitionAssignor retry logic (#6085) 1. The retry loop of the InternalTopicManager would just be: a) describe topics, and exclude those which already exist with the right num.partitions, b) for the remaining topics, try to create them. Remove any inner loops. 2. In CreateTopicResponse and MetadataResponse (for describe topic), handle the special error code of TopicExist and UnknownTopicOrPartition in order to retry in the next loop. 3. Do not handle TimeoutException since it should already been handled inside AdminClient. Add corresponding unit tests for a) topic marked for deletion but not complete yet, in which case metadata response would not contain this topic, but create topic would return error TopicExists; b) request keep getting timed out. Reviewers: Matthias J. Sax <matth...@confluent.io> --- .../kafka/clients/admin/MockAdminClient.java | 17 +- .../processor/internals/InternalTopicManager.java | 219 ++++++++++----------- .../internals/InternalTopicManagerTest.java | 31 ++- 3 files changed, 140 insertions(+), 127 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index b5131ae..9fe1ba4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -108,6 +108,14 @@ public class MockAdminClient extends AdminClient { allTopics.put(name, new TopicMetadata(internal, partitions, configs)); } + public void markTopicForDeletion(final String name) { + if (!allTopics.containsKey(name)) { + throw new IllegalArgumentException(String.format("Topic %s did not exist.", name)); + } + + allTopics.get(name).markedForDeletion = true; + } + public void timeoutNextRequest(int numberOfRequest) { timeoutNextRequests = numberOfRequest; } @@ -167,7 +175,7 @@ public class MockAdminClient extends AdminClient { int numberOfPartitions = newTopic.numPartitions(); List<TopicPartitionInfo> partitions = new ArrayList<>(numberOfPartitions); for (int p = 0; p < numberOfPartitions; ++p) { - partitions.add(new TopicPartitionInfo(p, brokers.get(0), replicas, Collections.<Node>emptyList())); + partitions.add(new TopicPartitionInfo(p, brokers.get(0), replicas, Collections.emptyList())); } allTopics.put(topicName, new TopicMetadata(false, partitions, newTopic.configs())); future.complete(null); @@ -217,7 +225,7 @@ public class MockAdminClient extends AdminClient { for (String requestedTopic : topicNames) { for (Map.Entry<String, TopicMetadata> topicDescription : allTopics.entrySet()) { String topicName = topicDescription.getKey(); - if (topicName.equals(requestedTopic)) { + if (topicName.equals(requestedTopic) && !topicDescription.getValue().markedForDeletion) { TopicMetadata topicMetadata = topicDescription.getValue(); KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); future.complete(new TopicDescription(topicName, topicMetadata.isInternalTopic, topicMetadata.partitions)); @@ -386,12 +394,15 @@ public class MockAdminClient extends AdminClient { final List<TopicPartitionInfo> partitions; final Map<String, String> configs; + public boolean markedForDeletion; + TopicMetadata(boolean isInternalTopic, List<TopicPartitionInfo> partitions, Map<String, String> configs) { this.isInternalTopic = isInternalTopic; this.partitions = partitions; - this.configs = configs != null ? configs : Collections.<String, String>emptyMap(); + this.configs = configs != null ? configs : Collections.emptyMap(); + this.markedForDeletion = false; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index 7e35126..40c25d1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -23,16 +23,15 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.slf4j.Logger; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -95,173 +94,155 @@ public class InternalTopicManager { * If a topic exists already but has different number of partitions we fail and throw exception requesting user to reset the app before restarting again. */ public void makeReady(final Map<String, InternalTopicConfig> topics) { - final Map<String, Integer> existingTopicPartitions = getNumPartitions(topics.keySet()); - final Set<InternalTopicConfig> topicsToBeCreated = validateTopicPartitions(topics.values(), existingTopicPartitions); - if (topicsToBeCreated.size() > 0) { - final Set<NewTopic> newTopics = new HashSet<>(); + // we will do the validation / topic-creation in a loop, until we have confirmed all topics + // have existed with the expected number of partitions, or some create topic returns fatal errors. - for (final InternalTopicConfig internalTopicConfig : topicsToBeCreated) { - final Map<String, String> topicConfig = internalTopicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention); + int remainingRetries = retries; + Set<String> topicsNotReady = new HashSet<>(topics.keySet()); - log.debug("Going to create topic {} with {} partitions and config {}.", - internalTopicConfig.name(), - internalTopicConfig.numberOfPartitions(), - topicConfig); + while (!topicsNotReady.isEmpty() && remainingRetries >= 0) { + topicsNotReady = validateTopics(topicsNotReady, topics); + + if (topicsNotReady.size() > 0) { + final Set<NewTopic> newTopics = new HashSet<>(); - newTopics.add( - new NewTopic( + for (final String topicName : topicsNotReady) { + final InternalTopicConfig internalTopicConfig = Utils.notNull(topics.get(topicName)); + final Map<String, String> topicConfig = internalTopicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention); + + log.debug("Going to create topic {} with {} partitions and config {}.", internalTopicConfig.name(), internalTopicConfig.numberOfPartitions(), - replicationFactor) - .configs(topicConfig)); - } + topicConfig); - // TODO: KAFKA-6928. should not need retries in the outer caller as it will be retried internally in admin client - int remainingRetries = retries; - boolean retryBackOff = false; - boolean retry; - do { - retry = false; + newTopics.add( + new NewTopic( + internalTopicConfig.name(), + internalTopicConfig.numberOfPartitions(), + replicationFactor) + .configs(topicConfig)); + } final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics); - final Set<String> createdTopicNames = new HashSet<>(); for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult : createTopicsResult.values().entrySet()) { + final String topicName = createTopicResult.getKey(); try { - if (retryBackOff) { - retryBackOff = false; - Thread.sleep(retryBackOffMs); - } createTopicResult.getValue().get(); - createdTopicNames.add(createTopicResult.getKey()); - } catch (final ExecutionException couldNotCreateTopic) { - final Throwable cause = couldNotCreateTopic.getCause(); - final String topicName = createTopicResult.getKey(); - - if (cause instanceof TimeoutException) { - retry = true; - log.debug("Could not get number of partitions for topic {} due to timeout. " + - "Will try again (remaining retries {}).", topicName, remainingRetries - 1); - } else if (cause instanceof TopicExistsException) { - // This topic didn't exist earlier, it might be marked for deletion or it might differ - // from the desired setup. It needs re-validation. - final Map<String, Integer> existingTopicPartition = getNumPartitions(Collections.singleton(topicName)); - - if (existingTopicPartition.containsKey(topicName) - && validateTopicPartitions(Collections.singleton(topics.get(topicName)), existingTopicPartition).isEmpty()) { - createdTopicNames.add(createTopicResult.getKey()); - log.info("Topic {} exists already and has the right number of partitions: {}", - topicName, - couldNotCreateTopic.toString()); - } else { - retry = true; - retryBackOff = true; - log.info("Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\n" + - "Will retry to create this topic in {} ms (to let broker finish async delete operation first).\n" + - "Error message was: {}", topicName, retryBackOffMs, couldNotCreateTopic.toString()); - } - } else { - throw new StreamsException(String.format("Could not create topic %s.", topicName), - couldNotCreateTopic); - } + topicsNotReady.remove(topicName); } catch (final InterruptedException fatalException) { + // this should not happen; if it ever happens it indicate a bug Thread.currentThread().interrupt(); log.error(INTERRUPTED_ERROR_MESSAGE, fatalException); throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException); + } catch (final ExecutionException executionException) { + final Throwable cause = executionException.getCause(); + if (cause instanceof TopicExistsException) { + // This topic didn't exist earlier or its leader not known before; just retain it for next round of validation. + log.info("Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\n" + + "Will retry to create this topic in {} ms (to let broker finish async delete operation first).\n" + + "Error message was: {}", topicName, retryBackOffMs, cause.toString()); + } else { + log.error("Unexpected error during topic creation for {}.\n" + + "Error message was: {}", topicName, cause.toString()); + throw new StreamsException(String.format("Could not create topic %s.", topicName), cause); + } } } + } - if (retry) { - newTopics.removeIf(newTopic -> createdTopicNames.contains(newTopic.name())); - continue; + if (!topicsNotReady.isEmpty()) { + log.info("Topics {} can not be made ready with {} retries left", topicsNotReady, retries); + + try { + Thread.sleep(retryBackOffMs); + } catch (final InterruptedException e) { + // this is okay, we just wake up early + Thread.currentThread().interrupt(); } - return; - } while (remainingRetries-- > 0); + remainingRetries--; + } + } - final String timeoutAndRetryError = "Could not create topics. " + + if (!topicsNotReady.isEmpty()) { + final String timeoutAndRetryError = String.format("Could not create topics after %d retries. " + "This can happen if the Kafka cluster is temporary not available. " + - "You can increase admin client config `retries` to be resilient against this error."; + "You can increase admin client config `retries` to be resilient against this error.", retries); log.error(timeoutAndRetryError); throw new StreamsException(timeoutAndRetryError); } } /** - * Get the number of partitions for the given topics + * Try to get the number of partitions for the given topics; return the number of partitions for topics that already exists. + * + * Topics that were not able to get its description will simply not be returned */ // visible for testing protected Map<String, Integer> getNumPartitions(final Set<String> topics) { log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics); - // TODO: KAFKA-6928. should not need retries in the outer caller as it will be retried internally in admin client - int remainingRetries = retries; - boolean retry; - do { - retry = false; - - final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics); - final Map<String, KafkaFuture<TopicDescription>> futures = describeTopicsResult.values(); - - final Map<String, Integer> existingNumberOfPartitionsPerTopic = new HashMap<>(); - for (final Map.Entry<String, KafkaFuture<TopicDescription>> topicFuture : futures.entrySet()) { - try { - final TopicDescription topicDescription = topicFuture.getValue().get(); - existingNumberOfPartitionsPerTopic.put( - topicFuture.getKey(), - topicDescription.partitions().size()); - } catch (final InterruptedException fatalException) { - Thread.currentThread().interrupt(); - log.error(INTERRUPTED_ERROR_MESSAGE, fatalException); - throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException); - } catch (final ExecutionException couldNotDescribeTopicException) { - final Throwable cause = couldNotDescribeTopicException.getCause(); - if (cause instanceof TimeoutException) { - retry = true; - log.debug("Could not get number of partitions for topic {} due to timeout. " + - "Will try again (remaining retries {}).", topicFuture.getKey(), remainingRetries - 1); - } else { - final String error = "Could not get number of partitions for topic {} due to {}"; - log.debug(error, topicFuture.getKey(), cause.toString()); - } + final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics); + final Map<String, KafkaFuture<TopicDescription>> futures = describeTopicsResult.values(); + + final Map<String, Integer> existedTopicPartition = new HashMap<>(); + for (final Map.Entry<String, KafkaFuture<TopicDescription>> topicFuture : futures.entrySet()) { + final String topicName = topicFuture.getKey(); + try { + final TopicDescription topicDescription = topicFuture.getValue().get(); + existedTopicPartition.put( + topicFuture.getKey(), + topicDescription.partitions().size()); + } catch (final InterruptedException fatalException) { + // this should not happen; if it ever happens it indicate a bug + Thread.currentThread().interrupt(); + log.error(INTERRUPTED_ERROR_MESSAGE, fatalException); + throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException); + } catch (final ExecutionException couldNotDescribeTopicException) { + final Throwable cause = couldNotDescribeTopicException.getCause(); + if (cause instanceof UnknownTopicOrPartitionException || + cause instanceof LeaderNotAvailableException) { + // This topic didn't exist or leader is not known yet, proceed to try to create it + log.debug("Topic {} is unknown, hence not existed yet.", topicName); + } else { + log.error("Unexpected error during topic description for {}.\n" + + "Error message was: {}", topicName, cause.toString()); + throw new StreamsException(String.format("Could not create topic %s.", topicName), cause); } } + } - if (retry) { - topics.removeAll(existingNumberOfPartitionsPerTopic.keySet()); - continue; - } - - return existingNumberOfPartitionsPerTopic; - } while (remainingRetries-- > 0); - - return Collections.emptyMap(); + return existedTopicPartition; } /** - * Check the existing topics to have correct number of partitions; and return the non existing topics to be created + * Check the existing topics to have correct number of partitions; and return the remaining topics that needs to be created */ - private Set<InternalTopicConfig> validateTopicPartitions(final Collection<InternalTopicConfig> topicsPartitionsMap, - final Map<String, Integer> existingTopicNamesPartitions) { - final Set<InternalTopicConfig> topicsToBeCreated = new HashSet<>(); - for (final InternalTopicConfig topic : topicsPartitionsMap) { - final int numberOfPartitions = topic.numberOfPartitions(); - if (existingTopicNamesPartitions.containsKey(topic.name())) { - if (!existingTopicNamesPartitions.get(topic.name()).equals(numberOfPartitions)) { + private Set<String> validateTopics(final Set<String> topicsToValidate, + final Map<String, InternalTopicConfig> topicsMap) { + + final Map<String, Integer> existedTopicPartition = getNumPartitions(topicsToValidate); + + final Set<String> topicsToCreate = new HashSet<>(); + for (final Map.Entry<String, InternalTopicConfig> entry : topicsMap.entrySet()) { + final String topicName = entry.getKey(); + final int numberOfPartitions = entry.getValue().numberOfPartitions(); + if (existedTopicPartition.containsKey(topicName)) { + if (!existedTopicPartition.get(topicName).equals(numberOfPartitions)) { final String errorMsg = String.format("Existing internal topic %s has invalid partitions: " + "expected: %d; actual: %d. " + "Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.", - topic.name(), numberOfPartitions, existingTopicNamesPartitions.get(topic.name())); + topicName, numberOfPartitions, existedTopicPartition.get(topicName)); log.error(errorMsg); throw new StreamsException(errorMsg); } } else { - topicsToBeCreated.add(topic); + topicsToCreate.add(topicName); } } - return topicsToBeCreated; + return topicsToCreate; } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java index 344dc05..e91bf32 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; @@ -39,6 +40,7 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class InternalTopicManagerTest { @@ -166,28 +168,47 @@ public class InternalTopicManagerTest { mockAdminClient, new StreamsConfig(config)); - final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic, Collections.<String, String>emptyMap()); + final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic, Collections.emptyMap()); internalTopicConfig.setNumberOfPartitions(1); internalTopicManager2.makeReady(Collections.singletonMap(topic, internalTopicConfig)); } @Test public void shouldNotThrowExceptionForEmptyTopicMap() { - internalTopicManager.makeReady(Collections.<String, InternalTopicConfig>emptyMap()); + internalTopicManager.makeReady(Collections.emptyMap()); } @Test public void shouldExhaustRetriesOnTimeoutExceptionForMakeReady() { - mockAdminClient.timeoutNextRequest(4); + mockAdminClient.timeoutNextRequest(1); - final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic, Collections.<String, String>emptyMap()); + final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic, Collections.emptyMap()); + internalTopicConfig.setNumberOfPartitions(1); + try { + internalTopicManager.makeReady(Collections.singletonMap(topic, internalTopicConfig)); + fail("Should have thrown StreamsException."); + } catch (final StreamsException expected) { + assertEquals(TimeoutException.class, expected.getCause().getClass()); + } + } + + @Test + public void shouldExhaustRetriesOnMarkedForDeletionTopic() { + mockAdminClient.addTopic( + false, + topic, + Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())), + null); + mockAdminClient.markTopicForDeletion(topic); + + final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic, Collections.emptyMap()); internalTopicConfig.setNumberOfPartitions(1); try { internalTopicManager.makeReady(Collections.singletonMap(topic, internalTopicConfig)); fail("Should have thrown StreamsException."); } catch (final StreamsException expected) { assertNull(expected.getCause()); - assertEquals("Could not create topics. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error.", expected.getMessage()); + assertTrue(expected.getMessage().startsWith("Could not create topics after 1 retries")); } }