DanielWang2035 commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3278693902
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java:
##########
@@ -258,6 +282,228 @@ private void checkBeforeAlteringTopicInternal(TopicMeta
topicMeta) throws Subscr
throw new SubscriptionException(exceptionMessage);
}
+ private Map<String, String> safeTopicAttributes(@Nullable final Map<String,
String> attributes) {
+ return Objects.nonNull(attributes) ? attributes : Collections.emptyMap();
+ }
+
+ private void validateTopicConfig(final TopicConfig topicConfig) throws
SubscriptionException {
+ final String mode = topicConfig.getMode();
+ if (!TopicConfig.isValidMode(mode)) {
+ final String exceptionMessage =
+ String.format(
+ "Failed to create or alter topic, unsupported %s=%s, expected
one of [%s, %s, %s]",
+ TopicConstant.MODE_KEY,
+ mode,
+ TopicConstant.MODE_SNAPSHOT_VALUE,
+ TopicConstant.MODE_LIVE_VALUE,
+ TopicConstant.MODE_CONSENSUS_VALUE);
+ LOGGER.warn(exceptionMessage);
+ throw new SubscriptionException(exceptionMessage);
+ }
+
+ validateConsensusProtocolSupport(topicConfig);
+
+ if (topicConfig.isConsensusMode() && !topicConfig.isRecordFormat()) {
+ final String exceptionMessage =
+ String.format(
+ "Failed to create or alter topic, %s=%s only supports %s=%s",
+ TopicConstant.MODE_KEY,
+ TopicConstant.MODE_CONSENSUS_VALUE,
+ TopicConstant.FORMAT_KEY,
+ TopicConstant.FORMAT_RECORD_HANDLER_VALUE);
+ LOGGER.warn(exceptionMessage);
+ throw new SubscriptionException(exceptionMessage);
+ }
+
+ final String orderMode = topicConfig.getOrderMode();
+ if (!TopicConfig.isValidOrderMode(orderMode)) {
+ final String exceptionMessage =
+ String.format(
+ "Failed to create or alter topic, unsupported %s=%s, expected
one of [%s, %s, %s]",
+ TopicConstant.ORDER_MODE_KEY,
+ orderMode,
+ TopicConstant.ORDER_MODE_LEADER_ONLY_VALUE,
+ TopicConstant.ORDER_MODE_MULTI_WRITER_VALUE,
+ TopicConstant.ORDER_MODE_PER_WRITER_VALUE);
+ LOGGER.warn(exceptionMessage);
+ throw new SubscriptionException(exceptionMessage);
+ }
+
+ validateConsensusTableColumnPattern(topicConfig);
+ validateConsensusTopicRetentionConfig(topicConfig);
+ }
+
+ private void validateConsensusProtocolSupport(final TopicConfig topicConfig)
+ throws SubscriptionException {
+ if (!topicConfig.isConsensusMode()) {
+ return;
+ }
+
+ final String actualProtocol =
String.valueOf(CONF.getDataRegionConsensusProtocolClass());
+ if (ConsensusFactory.IOT_CONSENSUS.equals(actualProtocol)) {
+ return;
+ }
+
+ final String exceptionMessage =
+ String.format(
+ "Failed to create or alter topic, %s=%s is only supported when
%s=%s, but current value is %s",
+ TopicConstant.MODE_KEY,
+ TopicConstant.MODE_CONSENSUS_VALUE,
+ DATA_REGION_CONSENSUS_PROTOCOL_CLASS_KEY,
+ ConsensusFactory.IOT_CONSENSUS,
+ actualProtocol);
+ LOGGER.warn(exceptionMessage);
+ throw new SubscriptionException(exceptionMessage);
+ }
+
+ private void validateConsensusTableColumnPattern(final TopicConfig
topicConfig)
+ throws SubscriptionException {
+ if (!topicConfig.hasAttribute(TopicConstant.COLUMN_KEY)) {
+ return;
+ }
+
+ if (!topicConfig.isTableTopic()) {
+ final String exceptionMessage =
+ String.format(
+ "Failed to create or alter topic, %s is only supported for table
topics",
+ TopicConstant.COLUMN_KEY);
+ LOGGER.warn(exceptionMessage);
+ throw new SubscriptionException(exceptionMessage);
+ }
+
+ if (!isConsensusBasedTopicConfig(topicConfig)) {
+ final String exceptionMessage =
+ String.format(
+ "Failed to create or alter topic, %s is only supported for
consensus table topics",
+ TopicConstant.COLUMN_KEY);
+ LOGGER.warn(exceptionMessage);
+ throw new SubscriptionException(exceptionMessage);
+ }
+
+ final String columnPattern =
+ topicConfig.getStringOrDefault(
+ TopicConstant.COLUMN_KEY, TopicConstant.COLUMN_DEFAULT_VALUE);
+ try {
+ Pattern.compile(columnPattern);
+ } catch (final PatternSyntaxException e) {
+ final String exceptionMessage =
+ String.format(
+ "Failed to create or alter topic, illegal %s=%s, detail: %s",
+ TopicConstant.COLUMN_KEY, columnPattern, e.getMessage());
+ LOGGER.warn(exceptionMessage, e);
+ throw new SubscriptionException(exceptionMessage);
+ }
+ }
+
+ private boolean isConsensusBasedTopicConfig(final TopicConfig topicConfig) {
+ return topicConfig.isConsensusMode();
+ }
+
+ private void validateConsensusTopicRetentionConfig(final TopicConfig
topicConfig)
+ throws SubscriptionException {
+ if (!topicConfig.hasAttribute(TopicConstant.RETENTION_BYTES_KEY)
+ && !topicConfig.hasAttribute(TopicConstant.RETENTION_MS_KEY)) {
+ return;
+ }
+
+ if (!isConsensusBasedTopicConfig(topicConfig)) {
+ final String exceptionMessage =
+ String.format(
+ "Failed to create or alter topic, %s and %s are only supported
for consensus topics",
+ TopicConstant.RETENTION_BYTES_KEY,
TopicConstant.RETENTION_MS_KEY);
+ LOGGER.warn(exceptionMessage);
+ throw new SubscriptionException(exceptionMessage);
+ }
+
+ validateRetentionValue(topicConfig, TopicConstant.RETENTION_BYTES_KEY);
+ validateRetentionValue(topicConfig, TopicConstant.RETENTION_MS_KEY);
Review Comment:
Kafka uses raw numeric values for retention.bytes and retention.ms, the unit
is encoded in the config key. The current behavior in our code is the same, so
I kept it unchanged.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]