This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new e501f52  KAFKA-6928: Refactor StreamsPartitionAssignor retry logic 
(#6085)
e501f52 is described below

commit e501f5273032839d5e4b580a9fcef4fac0188970
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Fri Jan 4 10:49:08 2019 -0800

    KAFKA-6928: Refactor StreamsPartitionAssignor retry logic (#6085)
    
    1. The retry loop of the InternalTopicManager would just be: a) describe 
topics, and exclude those which already exist with the right num.partitions, b) 
for the remaining topics, try to create them. Remove any inner loops.
    
    2. In CreateTopicResponse and MetadataResponse (for describe topic), handle 
the special error code of TopicExist and UnknownTopicOrPartition in order to 
retry in the next loop.
    
    3. Do not handle TimeoutException since it should already been handled 
inside AdminClient.
    
    Add corresponding unit tests for a) topic marked for deletion but not 
complete yet, in which case metadata response would not contain this topic, but 
create topic would return error TopicExists; b) request keep getting timed out.
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>
---
 .../kafka/clients/admin/MockAdminClient.java       |  17 +-
 .../processor/internals/InternalTopicManager.java  | 219 ++++++++++-----------
 .../internals/InternalTopicManagerTest.java        |  31 ++-
 3 files changed, 140 insertions(+), 127 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index b5131ae..9fe1ba4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -108,6 +108,14 @@ public class MockAdminClient extends AdminClient {
         allTopics.put(name, new TopicMetadata(internal, partitions, configs));
     }
 
+    public void markTopicForDeletion(final String name) {
+        if (!allTopics.containsKey(name)) {
+            throw new IllegalArgumentException(String.format("Topic %s did not 
exist.", name));
+        }
+
+        allTopics.get(name).markedForDeletion = true;
+    }
+
     public void timeoutNextRequest(int numberOfRequest) {
         timeoutNextRequests = numberOfRequest;
     }
@@ -167,7 +175,7 @@ public class MockAdminClient extends AdminClient {
             int numberOfPartitions = newTopic.numPartitions();
             List<TopicPartitionInfo> partitions = new 
ArrayList<>(numberOfPartitions);
             for (int p = 0; p < numberOfPartitions; ++p) {
-                partitions.add(new TopicPartitionInfo(p, brokers.get(0), 
replicas, Collections.<Node>emptyList()));
+                partitions.add(new TopicPartitionInfo(p, brokers.get(0), 
replicas, Collections.emptyList()));
             }
             allTopics.put(topicName, new TopicMetadata(false, partitions, 
newTopic.configs()));
             future.complete(null);
@@ -217,7 +225,7 @@ public class MockAdminClient extends AdminClient {
         for (String requestedTopic : topicNames) {
             for (Map.Entry<String, TopicMetadata> topicDescription : 
allTopics.entrySet()) {
                 String topicName = topicDescription.getKey();
-                if (topicName.equals(requestedTopic)) {
+                if (topicName.equals(requestedTopic) && 
!topicDescription.getValue().markedForDeletion) {
                     TopicMetadata topicMetadata = topicDescription.getValue();
                     KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
                     future.complete(new TopicDescription(topicName, 
topicMetadata.isInternalTopic, topicMetadata.partitions));
@@ -386,12 +394,15 @@ public class MockAdminClient extends AdminClient {
         final List<TopicPartitionInfo> partitions;
         final Map<String, String> configs;
 
+        public boolean markedForDeletion;
+
         TopicMetadata(boolean isInternalTopic,
                       List<TopicPartitionInfo> partitions,
                       Map<String, String> configs) {
             this.isInternalTopic = isInternalTopic;
             this.partitions = partitions;
-            this.configs = configs != null ? configs : Collections.<String, 
String>emptyMap();
+            this.configs = configs != null ? configs : Collections.emptyMap();
+            this.markedForDeletion = false;
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 7e35126..40c25d1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -23,16 +23,15 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.slf4j.Logger;
 
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -95,173 +94,155 @@ public class InternalTopicManager {
      * If a topic exists already but has different number of partitions we 
fail and throw exception requesting user to reset the app before restarting 
again.
      */
     public void makeReady(final Map<String, InternalTopicConfig> topics) {
-        final Map<String, Integer> existingTopicPartitions = 
getNumPartitions(topics.keySet());
-        final Set<InternalTopicConfig> topicsToBeCreated = 
validateTopicPartitions(topics.values(), existingTopicPartitions);
-        if (topicsToBeCreated.size() > 0) {
-            final Set<NewTopic> newTopics = new HashSet<>();
+        // we will do the validation / topic-creation in a loop, until we have 
confirmed all topics
+        // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
 
-            for (final InternalTopicConfig internalTopicConfig : 
topicsToBeCreated) {
-                final Map<String, String> topicConfig = 
internalTopicConfig.getProperties(defaultTopicConfigs, 
windowChangeLogAdditionalRetention);
+        int remainingRetries = retries;
+        Set<String> topicsNotReady = new HashSet<>(topics.keySet());
 
-                log.debug("Going to create topic {} with {} partitions and 
config {}.",
-                        internalTopicConfig.name(),
-                        internalTopicConfig.numberOfPartitions(),
-                        topicConfig);
+        while (!topicsNotReady.isEmpty() && remainingRetries >= 0) {
+            topicsNotReady = validateTopics(topicsNotReady, topics);
+
+            if (topicsNotReady.size() > 0) {
+                final Set<NewTopic> newTopics = new HashSet<>();
 
-                newTopics.add(
-                    new NewTopic(
+                for (final String topicName : topicsNotReady) {
+                    final InternalTopicConfig internalTopicConfig = 
Utils.notNull(topics.get(topicName));
+                    final Map<String, String> topicConfig = 
internalTopicConfig.getProperties(defaultTopicConfigs, 
windowChangeLogAdditionalRetention);
+
+                    log.debug("Going to create topic {} with {} partitions and 
config {}.",
                         internalTopicConfig.name(),
                         internalTopicConfig.numberOfPartitions(),
-                        replicationFactor)
-                    .configs(topicConfig));
-            }
+                        topicConfig);
 
-            // TODO: KAFKA-6928. should not need retries in the outer caller 
as it will be retried internally in admin client
-            int remainingRetries = retries;
-            boolean retryBackOff = false;
-            boolean retry;
-            do {
-                retry = false;
+                    newTopics.add(
+                        new NewTopic(
+                            internalTopicConfig.name(),
+                            internalTopicConfig.numberOfPartitions(),
+                            replicationFactor)
+                            .configs(topicConfig));
+                }
 
                 final CreateTopicsResult createTopicsResult = 
adminClient.createTopics(newTopics);
 
-                final Set<String> createdTopicNames = new HashSet<>();
                 for (final Map.Entry<String, KafkaFuture<Void>> 
createTopicResult : createTopicsResult.values().entrySet()) {
+                    final String topicName = createTopicResult.getKey();
                     try {
-                        if (retryBackOff) {
-                            retryBackOff = false;
-                            Thread.sleep(retryBackOffMs);
-                        }
                         createTopicResult.getValue().get();
-                        createdTopicNames.add(createTopicResult.getKey());
-                    } catch (final ExecutionException couldNotCreateTopic) {
-                        final Throwable cause = couldNotCreateTopic.getCause();
-                        final String topicName = createTopicResult.getKey();
-
-                        if (cause instanceof TimeoutException) {
-                            retry = true;
-                            log.debug("Could not get number of partitions for 
topic {} due to timeout. " +
-                                "Will try again (remaining retries {}).", 
topicName, remainingRetries - 1);
-                        } else if (cause instanceof TopicExistsException) {
-                            // This topic didn't exist earlier, it might be 
marked for deletion or it might differ
-                            // from the desired setup. It needs re-validation.
-                            final Map<String, Integer> existingTopicPartition 
= getNumPartitions(Collections.singleton(topicName));
-
-                            if (existingTopicPartition.containsKey(topicName)
-                                    && 
validateTopicPartitions(Collections.singleton(topics.get(topicName)), 
existingTopicPartition).isEmpty()) {
-                                
createdTopicNames.add(createTopicResult.getKey());
-                                log.info("Topic {} exists already and has the 
right number of partitions: {}",
-                                        topicName,
-                                        couldNotCreateTopic.toString());
-                            } else {
-                                retry = true;
-                                retryBackOff = true;
-                                log.info("Could not create topic {}. Topic is 
probably marked for deletion (number of partitions is unknown).\n" +
-                                        "Will retry to create this topic in {} 
ms (to let broker finish async delete operation first).\n" +
-                                        "Error message was: {}", topicName, 
retryBackOffMs, couldNotCreateTopic.toString());
-                            }
-                        } else {
-                            throw new StreamsException(String.format("Could 
not create topic %s.", topicName),
-                                couldNotCreateTopic);
-                        }
+                        topicsNotReady.remove(topicName);
                     } catch (final InterruptedException fatalException) {
+                        // this should not happen; if it ever happens it 
indicate a bug
                         Thread.currentThread().interrupt();
                         log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
                         throw new 
IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
+                    } catch (final ExecutionException executionException) {
+                        final Throwable cause = executionException.getCause();
+                        if (cause instanceof TopicExistsException) {
+                            // This topic didn't exist earlier or its leader 
not known before; just retain it for next round of validation.
+                            log.info("Could not create topic {}. Topic is 
probably marked for deletion (number of partitions is unknown).\n" +
+                                "Will retry to create this topic in {} ms (to 
let broker finish async delete operation first).\n" +
+                                "Error message was: {}", topicName, 
retryBackOffMs, cause.toString());
+                        } else {
+                            log.error("Unexpected error during topic creation 
for {}.\n" +
+                                "Error message was: {}", topicName, 
cause.toString());
+                            throw new StreamsException(String.format("Could 
not create topic %s.", topicName), cause);
+                        }
                     }
                 }
+            }
 
-                if (retry) {
-                    newTopics.removeIf(newTopic -> 
createdTopicNames.contains(newTopic.name()));
 
-                    continue;
+            if (!topicsNotReady.isEmpty()) {
+                log.info("Topics {} can not be made ready with {} retries 
left", topicsNotReady, retries);
+
+                try {
+                    Thread.sleep(retryBackOffMs);
+                } catch (final InterruptedException e) {
+                    // this is okay, we just wake up early
+                    Thread.currentThread().interrupt();
                 }
 
-                return;
-            } while (remainingRetries-- > 0);
+                remainingRetries--;
+            }
+        }
 
-            final String timeoutAndRetryError = "Could not create topics. " +
+        if (!topicsNotReady.isEmpty()) {
+            final String timeoutAndRetryError = String.format("Could not 
create topics after %d retries. " +
                 "This can happen if the Kafka cluster is temporary not 
available. " +
-                "You can increase admin client config `retries` to be 
resilient against this error.";
+                "You can increase admin client config `retries` to be 
resilient against this error.", retries);
             log.error(timeoutAndRetryError);
             throw new StreamsException(timeoutAndRetryError);
         }
     }
 
     /**
-     * Get the number of partitions for the given topics
+     * Try to get the number of partitions for the given topics; return the 
number of partitions for topics that already exists.
+     *
+     * Topics that were not able to get its description will simply not be 
returned
      */
     // visible for testing
     protected Map<String, Integer> getNumPartitions(final Set<String> topics) {
         log.debug("Trying to check if topics {} have been created with 
expected number of partitions.", topics);
 
-        // TODO: KAFKA-6928. should not need retries in the outer caller as it 
will be retried internally in admin client
-        int remainingRetries = retries;
-        boolean retry;
-        do {
-            retry = false;
-
-            final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(topics);
-            final Map<String, KafkaFuture<TopicDescription>> futures = 
describeTopicsResult.values();
-
-            final Map<String, Integer> existingNumberOfPartitionsPerTopic = 
new HashMap<>();
-            for (final Map.Entry<String, KafkaFuture<TopicDescription>> 
topicFuture : futures.entrySet()) {
-                try {
-                    final TopicDescription topicDescription = 
topicFuture.getValue().get();
-                    existingNumberOfPartitionsPerTopic.put(
-                        topicFuture.getKey(),
-                        topicDescription.partitions().size());
-                } catch (final InterruptedException fatalException) {
-                    Thread.currentThread().interrupt();
-                    log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
-                    throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, 
fatalException);
-                } catch (final ExecutionException 
couldNotDescribeTopicException) {
-                    final Throwable cause = 
couldNotDescribeTopicException.getCause();
-                    if (cause instanceof TimeoutException) {
-                        retry = true;
-                        log.debug("Could not get number of partitions for 
topic {} due to timeout. " +
-                            "Will try again (remaining retries {}).", 
topicFuture.getKey(), remainingRetries - 1);
-                    } else {
-                        final String error = "Could not get number of 
partitions for topic {} due to {}";
-                        log.debug(error, topicFuture.getKey(), 
cause.toString());
-                    }
+        final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(topics);
+        final Map<String, KafkaFuture<TopicDescription>> futures = 
describeTopicsResult.values();
+
+        final Map<String, Integer> existedTopicPartition = new HashMap<>();
+        for (final Map.Entry<String, KafkaFuture<TopicDescription>> 
topicFuture : futures.entrySet()) {
+            final String topicName = topicFuture.getKey();
+            try {
+                final TopicDescription topicDescription = 
topicFuture.getValue().get();
+                existedTopicPartition.put(
+                    topicFuture.getKey(),
+                    topicDescription.partitions().size());
+            } catch (final InterruptedException fatalException) {
+                // this should not happen; if it ever happens it indicate a bug
+                Thread.currentThread().interrupt();
+                log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
+                throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, 
fatalException);
+            } catch (final ExecutionException couldNotDescribeTopicException) {
+                final Throwable cause = 
couldNotDescribeTopicException.getCause();
+                if (cause instanceof UnknownTopicOrPartitionException ||
+                    cause instanceof LeaderNotAvailableException) {
+                    // This topic didn't exist or leader is not known yet, 
proceed to try to create it
+                    log.debug("Topic {} is unknown, hence not existed yet.", 
topicName);
+                } else {
+                    log.error("Unexpected error during topic description for 
{}.\n" +
+                        "Error message was: {}", topicName, cause.toString());
+                    throw new StreamsException(String.format("Could not create 
topic %s.", topicName), cause);
                 }
             }
+        }
 
-            if (retry) {
-                topics.removeAll(existingNumberOfPartitionsPerTopic.keySet());
-                continue;
-            }
-
-            return existingNumberOfPartitionsPerTopic;
-        } while (remainingRetries-- > 0);
-
-        return Collections.emptyMap();
+        return existedTopicPartition;
     }
 
     /**
-     * Check the existing topics to have correct number of partitions; and 
return the non existing topics to be created
+     * Check the existing topics to have correct number of partitions; and 
return the remaining topics that needs to be created
      */
-    private Set<InternalTopicConfig> validateTopicPartitions(final 
Collection<InternalTopicConfig> topicsPartitionsMap,
-                                                             final Map<String, 
Integer> existingTopicNamesPartitions) {
-        final Set<InternalTopicConfig> topicsToBeCreated = new HashSet<>();
-        for (final InternalTopicConfig topic : topicsPartitionsMap) {
-            final int numberOfPartitions = topic.numberOfPartitions();
-            if (existingTopicNamesPartitions.containsKey(topic.name())) {
-                if 
(!existingTopicNamesPartitions.get(topic.name()).equals(numberOfPartitions)) {
+    private Set<String> validateTopics(final Set<String> topicsToValidate,
+                                       final Map<String, InternalTopicConfig> 
topicsMap) {
+
+        final Map<String, Integer> existedTopicPartition = 
getNumPartitions(topicsToValidate);
+
+        final Set<String> topicsToCreate = new HashSet<>();
+        for (final Map.Entry<String, InternalTopicConfig> entry : 
topicsMap.entrySet()) {
+            final String topicName = entry.getKey();
+            final int numberOfPartitions = 
entry.getValue().numberOfPartitions();
+            if (existedTopicPartition.containsKey(topicName)) {
+                if 
(!existedTopicPartition.get(topicName).equals(numberOfPartitions)) {
                     final String errorMsg = String.format("Existing internal 
topic %s has invalid partitions: " +
                             "expected: %d; actual: %d. " +
                             "Use 'kafka.tools.StreamsResetter' tool to clean 
up invalid topics before processing.",
-                        topic.name(), numberOfPartitions, 
existingTopicNamesPartitions.get(topic.name()));
+                        topicName, numberOfPartitions, 
existedTopicPartition.get(topicName));
                     log.error(errorMsg);
                     throw new StreamsException(errorMsg);
                 }
             } else {
-                topicsToBeCreated.add(topic);
+                topicsToCreate.add(topicName);
             }
         }
 
-        return topicsToBeCreated;
+        return topicsToCreate;
     }
-
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 344dc05..e91bf32 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -39,6 +40,7 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class InternalTopicManagerTest {
@@ -166,28 +168,47 @@ public class InternalTopicManagerTest {
             mockAdminClient,
             new StreamsConfig(config));
 
-        final InternalTopicConfig internalTopicConfig = new 
RepartitionTopicConfig(topic, Collections.<String, String>emptyMap());
+        final InternalTopicConfig internalTopicConfig = new 
RepartitionTopicConfig(topic, Collections.emptyMap());
         internalTopicConfig.setNumberOfPartitions(1);
         internalTopicManager2.makeReady(Collections.singletonMap(topic, 
internalTopicConfig));
     }
 
     @Test
     public void shouldNotThrowExceptionForEmptyTopicMap() {
-        internalTopicManager.makeReady(Collections.<String, 
InternalTopicConfig>emptyMap());
+        internalTopicManager.makeReady(Collections.emptyMap());
     }
 
     @Test
     public void shouldExhaustRetriesOnTimeoutExceptionForMakeReady() {
-        mockAdminClient.timeoutNextRequest(4);
+        mockAdminClient.timeoutNextRequest(1);
 
-        final InternalTopicConfig internalTopicConfig = new 
RepartitionTopicConfig(topic, Collections.<String, String>emptyMap());
+        final InternalTopicConfig internalTopicConfig = new 
RepartitionTopicConfig(topic, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+        try {
+            internalTopicManager.makeReady(Collections.singletonMap(topic, 
internalTopicConfig));
+            fail("Should have thrown StreamsException.");
+        } catch (final StreamsException expected) {
+            assertEquals(TimeoutException.class, 
expected.getCause().getClass());
+        }
+    }
+
+    @Test
+    public void shouldExhaustRetriesOnMarkedForDeletionTopic() {
+        mockAdminClient.addTopic(
+            false,
+            topic,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, 
cluster, Collections.emptyList())),
+            null);
+        mockAdminClient.markTopicForDeletion(topic);
+
+        final InternalTopicConfig internalTopicConfig = new 
RepartitionTopicConfig(topic, Collections.emptyMap());
         internalTopicConfig.setNumberOfPartitions(1);
         try {
             internalTopicManager.makeReady(Collections.singletonMap(topic, 
internalTopicConfig));
             fail("Should have thrown StreamsException.");
         } catch (final StreamsException expected) {
             assertNull(expected.getCause());
-            assertEquals("Could not create topics. This can happen if the 
Kafka cluster is temporary not available. You can increase admin client config 
`retries` to be resilient against this error.", expected.getMessage());
+            assertTrue(expected.getMessage().startsWith("Could not create 
topics after 1 retries"));
         }
     }
 

Reply via email to