kkonstantine commented on a change in pull request #8828:
URL: https://github.com/apache/kafka/pull/8828#discussion_r438488444



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,162 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that 
specifies the topic name
+     * @return true if the admin client could be used to verify the topic 
setting, or false if
+     *         the verification could not be performed, likely because the 
admin client principal
+     *         did not have the required permissions or because the broker was 
older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the 
required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String 
workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies.isEmpty()) {
+            log.debug("Unable to use admin client to verify the cleanup policy 
of '{}' "
+                      + "topic is '{}', either because the broker is an older "
+                      + "version or because the Kafka principal used for 
Connect "
+                      + "internal topics does not have the required permission 
to "
+                      + "describe topic configurations.", topic, 
TopicConfig.CLEANUP_POLICY_COMPACT);
+            return false;
+        }
+        Set<String> expectedPolicies = 
Collections.singleton(TopicConfig.CLEANUP_POLICY_COMPACT);
+        if (!cleanupPolicies.equals(expectedPolicies)) {
+            String expectedPolicyStr = String.join(",", expectedPolicies);
+            String cleanupPolicyStr = String.join(",", cleanupPolicies);
+            String msg = String.format("Topic '%s' supplied via the '%s' 
property is required "
+                    + "to have '%s=%s' to guarantee consistency and durability 
of "
+                    + "%s, but found the topic currently has '%s=%s'. 
Continuing would likely "
+                    + "result in eventually losing %s and problems restarting 
this Connect "
+                    + "cluster in the future. Change the '%s' property in the "
+                    + "Connect worker configurations to use a topic with 
'%s=%s'.",
+                    topic, workerTopicConfig, 
TopicConfig.CLEANUP_POLICY_CONFIG, expectedPolicyStr,
+                    topicPurpose, TopicConfig.CLEANUP_POLICY_CONFIG, 
cleanupPolicyStr, topicPurpose,
+                    workerTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, 
expectedPolicyStr);
+            throw new ConfigException(msg);
+        }
+        return true;
+    }
+
+    /**
+     * Get the cleanup policy for a topic.
+     *
+     * @param topic the name of the topic
+     * @return the set of cleanup policies set for the topic; may be empty if 
the topic does not
+     *         exist or the topic's cleanup policy could not be retrieved
+     */
+    public Set<String> topicCleanupPolicy(String topic) {
+        Config topicConfig = describeTopicConfig(topic);
+        if (topicConfig == null) {
+            // The topic must not exist
+            log.debug("Unable to find topic '{}' when getting cleanup policy", 
topic);
+            return Collections.emptySet();
+        }
+        ConfigEntry entry = topicConfig.get(CLEANUP_POLICY_CONFIG);
+        if (entry != null && entry.value() != null) {
+            String policyStr = entry.value();
+            log.debug("Found cleanup.policy={} for topic '{}'", policyStr, 
topic);

Review comment:
       same question around log level as above

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,162 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that 
specifies the topic name
+     * @return true if the admin client could be used to verify the topic 
setting, or false if
+     *         the verification could not be performed, likely because the 
admin client principal
+     *         did not have the required permissions or because the broker was 
older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the 
required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String 
workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies.isEmpty()) {
+            log.debug("Unable to use admin client to verify the cleanup policy 
of '{}' "

Review comment:
       Should we consider `info`? It's a one time message right?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,162 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that 
specifies the topic name
+     * @return true if the admin client could be used to verify the topic 
setting, or false if
+     *         the verification could not be performed, likely because the 
admin client principal
+     *         did not have the required permissions or because the broker was 
older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the 
required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String 
workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies.isEmpty()) {
+            log.debug("Unable to use admin client to verify the cleanup policy 
of '{}' "
+                      + "topic is '{}', either because the broker is an older "
+                      + "version or because the Kafka principal used for 
Connect "
+                      + "internal topics does not have the required permission 
to "
+                      + "describe topic configurations.", topic, 
TopicConfig.CLEANUP_POLICY_COMPACT);
+            return false;
+        }
+        Set<String> expectedPolicies = 
Collections.singleton(TopicConfig.CLEANUP_POLICY_COMPACT);
+        if (!cleanupPolicies.equals(expectedPolicies)) {
+            String expectedPolicyStr = String.join(",", expectedPolicies);
+            String cleanupPolicyStr = String.join(",", cleanupPolicies);
+            String msg = String.format("Topic '%s' supplied via the '%s' 
property is required "
+                    + "to have '%s=%s' to guarantee consistency and durability 
of "
+                    + "%s, but found the topic currently has '%s=%s'. 
Continuing would likely "
+                    + "result in eventually losing %s and problems restarting 
this Connect "
+                    + "cluster in the future. Change the '%s' property in the "
+                    + "Connect worker configurations to use a topic with 
'%s=%s'.",
+                    topic, workerTopicConfig, 
TopicConfig.CLEANUP_POLICY_CONFIG, expectedPolicyStr,
+                    topicPurpose, TopicConfig.CLEANUP_POLICY_CONFIG, 
cleanupPolicyStr, topicPurpose,
+                    workerTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, 
expectedPolicyStr);
+            throw new ConfigException(msg);
+        }
+        return true;
+    }
+
+    /**
+     * Get the cleanup policy for a topic.
+     *
+     * @param topic the name of the topic
+     * @return the set of cleanup policies set for the topic; may be empty if 
the topic does not
+     *         exist or the topic's cleanup policy could not be retrieved
+     */
+    public Set<String> topicCleanupPolicy(String topic) {
+        Config topicConfig = describeTopicConfig(topic);
+        if (topicConfig == null) {
+            // The topic must not exist
+            log.debug("Unable to find topic '{}' when getting cleanup policy", 
topic);
+            return Collections.emptySet();
+        }
+        ConfigEntry entry = topicConfig.get(CLEANUP_POLICY_CONFIG);
+        if (entry != null && entry.value() != null) {
+            String policyStr = entry.value();
+            log.debug("Found cleanup.policy={} for topic '{}'", policyStr, 
topic);
+            return Arrays.stream(policyStr.split(","))
+                         .map(String::trim)
+                         .filter(s -> !s.isEmpty())
+                         .map(String::toLowerCase)
+                         .collect(Collectors.toSet());
+        }
+        // This is unexpected, as the topic config should include the 
cleanup.policy even if
+        // the topic settings don't override the broker's log.cleanup.policy. 
But just to be safe.
+        log.debug("Found no cleanup.policy for topic '{}'", topic);

Review comment:
       same question around log level as above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to