rhauch commented on a change in pull request #8828:
URL: https://github.com/apache/kafka/pull/8828#discussion_r438412308



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,152 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that 
specifies the topic name
+     * @return true if the admin client could be used to verify the topic 
setting, or false if
+     *         the verification could not be performed, likely because the 
admin client principal
+     *         did not have the required permissions or because the broker was 
older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the 
required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String 
workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies == null || cleanupPolicies.isEmpty()) {

Review comment:
       After some testing, I've discovered that even if the topic-specific 
settings don't explicitly specify the `cleanup.policy`, the config description 
for the topic returned by the admin client will still include 
`cleanup.policy=<broker's log.cleanup.policy>`. That means if we have 
permissions and the topic exists, we will always get the `cleanup.policy` for 
the topic.
   
   And, I've cleaned up the semantics of the `topicCleanupPolicy(...)` method 
to never return a null set. This means that if the set is empty, either the 
topic doesn't exist or the admin client could not return the topic settings 
(e.g., old broker, insufficient ACLs, etc.). Either way, this logic gets 
simplified a bit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to