DanielWang2035 commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3278693902


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java:
##########
@@ -258,6 +282,228 @@ private void checkBeforeAlteringTopicInternal(TopicMeta 
topicMeta) throws Subscr
     throw new SubscriptionException(exceptionMessage);
   }
 
+  private Map<String, String> safeTopicAttributes(@Nullable final Map<String, 
String> attributes) {
+    return Objects.nonNull(attributes) ? attributes : Collections.emptyMap();
+  }
+
+  private void validateTopicConfig(final TopicConfig topicConfig) throws 
SubscriptionException {
+    final String mode = topicConfig.getMode();
+    if (!TopicConfig.isValidMode(mode)) {
+      final String exceptionMessage =
+          String.format(
+              "Failed to create or alter topic, unsupported %s=%s, expected 
one of [%s, %s, %s]",
+              TopicConstant.MODE_KEY,
+              mode,
+              TopicConstant.MODE_SNAPSHOT_VALUE,
+              TopicConstant.MODE_LIVE_VALUE,
+              TopicConstant.MODE_CONSENSUS_VALUE);
+      LOGGER.warn(exceptionMessage);
+      throw new SubscriptionException(exceptionMessage);
+    }
+
+    validateConsensusProtocolSupport(topicConfig);
+
+    if (topicConfig.isConsensusMode() && !topicConfig.isRecordFormat()) {
+      final String exceptionMessage =
+          String.format(
+              "Failed to create or alter topic, %s=%s only supports %s=%s",
+              TopicConstant.MODE_KEY,
+              TopicConstant.MODE_CONSENSUS_VALUE,
+              TopicConstant.FORMAT_KEY,
+              TopicConstant.FORMAT_RECORD_HANDLER_VALUE);
+      LOGGER.warn(exceptionMessage);
+      throw new SubscriptionException(exceptionMessage);
+    }
+
+    final String orderMode = topicConfig.getOrderMode();
+    if (!TopicConfig.isValidOrderMode(orderMode)) {
+      final String exceptionMessage =
+          String.format(
+              "Failed to create or alter topic, unsupported %s=%s, expected 
one of [%s, %s, %s]",
+              TopicConstant.ORDER_MODE_KEY,
+              orderMode,
+              TopicConstant.ORDER_MODE_LEADER_ONLY_VALUE,
+              TopicConstant.ORDER_MODE_MULTI_WRITER_VALUE,
+              TopicConstant.ORDER_MODE_PER_WRITER_VALUE);
+      LOGGER.warn(exceptionMessage);
+      throw new SubscriptionException(exceptionMessage);
+    }
+
+    validateConsensusTableColumnPattern(topicConfig);
+    validateConsensusTopicRetentionConfig(topicConfig);
+  }
+
+  private void validateConsensusProtocolSupport(final TopicConfig topicConfig)
+      throws SubscriptionException {
+    if (!topicConfig.isConsensusMode()) {
+      return;
+    }
+
+    final String actualProtocol = 
String.valueOf(CONF.getDataRegionConsensusProtocolClass());
+    if (ConsensusFactory.IOT_CONSENSUS.equals(actualProtocol)) {
+      return;
+    }
+
+    final String exceptionMessage =
+        String.format(
+            "Failed to create or alter topic, %s=%s is only supported when 
%s=%s, but current value is %s",
+            TopicConstant.MODE_KEY,
+            TopicConstant.MODE_CONSENSUS_VALUE,
+            DATA_REGION_CONSENSUS_PROTOCOL_CLASS_KEY,
+            ConsensusFactory.IOT_CONSENSUS,
+            actualProtocol);
+    LOGGER.warn(exceptionMessage);
+    throw new SubscriptionException(exceptionMessage);
+  }
+
+  private void validateConsensusTableColumnPattern(final TopicConfig 
topicConfig)
+      throws SubscriptionException {
+    if (!topicConfig.hasAttribute(TopicConstant.COLUMN_KEY)) {
+      return;
+    }
+
+    if (!topicConfig.isTableTopic()) {
+      final String exceptionMessage =
+          String.format(
+              "Failed to create or alter topic, %s is only supported for table 
topics",
+              TopicConstant.COLUMN_KEY);
+      LOGGER.warn(exceptionMessage);
+      throw new SubscriptionException(exceptionMessage);
+    }
+
+    if (!isConsensusBasedTopicConfig(topicConfig)) {
+      final String exceptionMessage =
+          String.format(
+              "Failed to create or alter topic, %s is only supported for 
consensus table topics",
+              TopicConstant.COLUMN_KEY);
+      LOGGER.warn(exceptionMessage);
+      throw new SubscriptionException(exceptionMessage);
+    }
+
+    final String columnPattern =
+        topicConfig.getStringOrDefault(
+            TopicConstant.COLUMN_KEY, TopicConstant.COLUMN_DEFAULT_VALUE);
+    try {
+      Pattern.compile(columnPattern);
+    } catch (final PatternSyntaxException e) {
+      final String exceptionMessage =
+          String.format(
+              "Failed to create or alter topic, illegal %s=%s, detail: %s",
+              TopicConstant.COLUMN_KEY, columnPattern, e.getMessage());
+      LOGGER.warn(exceptionMessage, e);
+      throw new SubscriptionException(exceptionMessage);
+    }
+  }
+
+  private boolean isConsensusBasedTopicConfig(final TopicConfig topicConfig) {
+    return topicConfig.isConsensusMode();
+  }
+
+  private void validateConsensusTopicRetentionConfig(final TopicConfig 
topicConfig)
+      throws SubscriptionException {
+    if (!topicConfig.hasAttribute(TopicConstant.RETENTION_BYTES_KEY)
+        && !topicConfig.hasAttribute(TopicConstant.RETENTION_MS_KEY)) {
+      return;
+    }
+
+    if (!isConsensusBasedTopicConfig(topicConfig)) {
+      final String exceptionMessage =
+          String.format(
+              "Failed to create or alter topic, %s and %s are only supported 
for consensus topics",
+              TopicConstant.RETENTION_BYTES_KEY, 
TopicConstant.RETENTION_MS_KEY);
+      LOGGER.warn(exceptionMessage);
+      throw new SubscriptionException(exceptionMessage);
+    }
+
+    validateRetentionValue(topicConfig, TopicConstant.RETENTION_BYTES_KEY);
+    validateRetentionValue(topicConfig, TopicConstant.RETENTION_MS_KEY);

Review Comment:
   Kafka uses raw numeric values for retention.bytes and retention.ms, the unit 
is encoded in the config key. The current behavior in our code is the same, so 
I kept it unchanged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to