Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 merged PR #15670: URL: https://github.com/apache/kafka/pull/15670 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 commented on PR #15670: URL: https://github.com/apache/kafka/pull/15670#issuecomment-2049994583 `JDK 8 and Scala 2.12` is stopped exceptionally. could you please trigger QA again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1560841706 ## build.gradle: ## @@ -1828,6 +1828,7 @@ project(':storage') { implementation project(':storage:storage-api') implementation project(':server-common') implementation project(':clients') +implementation project(':transaction-coordinator') Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1560836816 ## build.gradle: ## @@ -1828,6 +1828,7 @@ project(':storage') { implementation project(':storage:storage-api') implementation project(':server-common') implementation project(':clients') +implementation project(':transaction-coordinator') Review Comment: we can remove this now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on PR #15670: URL: https://github.com/apache/kafka/pull/15670#issuecomment-2049439006 > @OmniaGM sorry that I just merge a PR and it causes conflicts to this PR. Could you please fix them? just rebased -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1560820501 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java: ## @@ -17,13 +17,12 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.transaction.TransactionLogConfigs; import java.util.Set; public class ProducerStateManagerConfig { -public static final String PRODUCER_ID_EXPIRATION_MS = "producer.id.expiration.ms"; -public static final String TRANSACTION_VERIFICATION_ENABLED = "transaction.partition.verification.enable"; -public static final Set RECONFIGURABLE_CONFIGS = Utils.mkSet(PRODUCER_ID_EXPIRATION_MS, TRANSACTION_VERIFICATION_ENABLED); +public static final Set RECONFIGURABLE_CONFIGS = Utils.mkSet(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG); Review Comment: I think this is to follow the same pattern we had `DynamicBrokerConfig .AllDynamicConfigs` that it refer to list of `RECONFIGURABLE_CONFIGS` from each module But I also can see that we kinda broke this pattern in `DynamicBrokerConfig.DynamicSecurityConfigs`, `DynamicBrokerConfig.DynamicPasswordConfigs` and `DynamicBrokerConfig.ReloadableFileConfigs`. I'll move to DynamicBrokerConfig -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1560820501 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java: ## @@ -17,13 +17,12 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.transaction.TransactionLogConfigs; import java.util.Set; public class ProducerStateManagerConfig { -public static final String PRODUCER_ID_EXPIRATION_MS = "producer.id.expiration.ms"; -public static final String TRANSACTION_VERIFICATION_ENABLED = "transaction.partition.verification.enable"; -public static final Set RECONFIGURABLE_CONFIGS = Utils.mkSet(PRODUCER_ID_EXPIRATION_MS, TRANSACTION_VERIFICATION_ENABLED); +public static final Set RECONFIGURABLE_CONFIGS = Utils.mkSet(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG); Review Comment: I think this is to follow the same pattern we had `DynamicBrokerConfig .AllDynamicConfigs` that it refer to list of `RECONFIGURABLE_CONFIGS` from each module But I also can see that we kinda broke this pattern in `DynamicBrokerConfig.DynamicSecurityConfigs`, `DynamicBrokerConfig.DynamicPasswordConfigs` and `DynamicBrokerConfig.ReloadableFileConfigs`. I'll move to DynamicBrokerConfig to follow the same pattern for DynamicSecurityConfigs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 commented on PR #15670: URL: https://github.com/apache/kafka/pull/15670#issuecomment-2049411941 @OmniaGM sorry that I just merge a PR and it causes conflicts to this PR. Could you please fix them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1560809899 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1062,21 +1027,21 @@ object KafkaConfig { .define(CompressionTypeProp, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, in(BrokerCompressionType.names.asScala.toSeq:_*), HIGH, CompressionTypeDoc) /** * Transaction management configuration ***/ - .define(TransactionalIdExpirationMsProp, INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, TransactionalIdExpirationMsDoc) - .define(TransactionsMaxTimeoutMsProp, INT, Defaults.TRANSACTIONS_MAX_TIMEOUT_MS, atLeast(1), HIGH, TransactionsMaxTimeoutMsDoc) - .define(TransactionsTopicMinISRProp, INT, Defaults.TRANSACTIONS_TOPIC_MIN_ISR, atLeast(1), HIGH, TransactionsTopicMinISRDoc) - .define(TransactionsLoadBufferSizeProp, INT, Defaults.TRANSACTIONS_LOAD_BUFFER_SIZE, atLeast(1), HIGH, TransactionsLoadBufferSizeDoc) - .define(TransactionsTopicReplicationFactorProp, SHORT, Defaults.TRANSACTIONS_TOPIC_REPLICATION_FACTOR, atLeast(1), HIGH, TransactionsTopicReplicationFactorDoc) - .define(TransactionsTopicPartitionsProp, INT, Defaults.TRANSACTIONS_TOPIC_PARTITIONS, atLeast(1), HIGH, TransactionsTopicPartitionsDoc) - .define(TransactionsTopicSegmentBytesProp, INT, Defaults.TRANSACTIONS_TOPIC_SEGMENT_BYTES, atLeast(1), HIGH, TransactionsTopicSegmentBytesDoc) - .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_ABORT_TIMED_OUT_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc) - .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_REMOVE_EXPIRED_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc) - - .define(TransactionPartitionVerificationEnableProp, BOOLEAN, Defaults.TRANSACTION_PARTITION_VERIFICATION_ENABLE, LOW, TransactionPartitionVerificationEnableDoc) - - .define(ProducerIdExpirationMsProp, INT, Defaults.PRODUCER_ID_EXPIRATION_MS, atLeast(1), LOW, ProducerIdExpirationMsDoc) + .define(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DOC) Review Comment: Sorry some how the comment was dropped from my radar. I just pushed a fix thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1560802777 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1062,21 +1027,21 @@ object KafkaConfig { .define(CompressionTypeProp, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, in(BrokerCompressionType.names.asScala.toSeq:_*), HIGH, CompressionTypeDoc) /** * Transaction management configuration ***/ - .define(TransactionalIdExpirationMsProp, INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, TransactionalIdExpirationMsDoc) - .define(TransactionsMaxTimeoutMsProp, INT, Defaults.TRANSACTIONS_MAX_TIMEOUT_MS, atLeast(1), HIGH, TransactionsMaxTimeoutMsDoc) - .define(TransactionsTopicMinISRProp, INT, Defaults.TRANSACTIONS_TOPIC_MIN_ISR, atLeast(1), HIGH, TransactionsTopicMinISRDoc) - .define(TransactionsLoadBufferSizeProp, INT, Defaults.TRANSACTIONS_LOAD_BUFFER_SIZE, atLeast(1), HIGH, TransactionsLoadBufferSizeDoc) - .define(TransactionsTopicReplicationFactorProp, SHORT, Defaults.TRANSACTIONS_TOPIC_REPLICATION_FACTOR, atLeast(1), HIGH, TransactionsTopicReplicationFactorDoc) - .define(TransactionsTopicPartitionsProp, INT, Defaults.TRANSACTIONS_TOPIC_PARTITIONS, atLeast(1), HIGH, TransactionsTopicPartitionsDoc) - .define(TransactionsTopicSegmentBytesProp, INT, Defaults.TRANSACTIONS_TOPIC_SEGMENT_BYTES, atLeast(1), HIGH, TransactionsTopicSegmentBytesDoc) - .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_ABORT_TIMED_OUT_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc) - .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_REMOVE_EXPIRED_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc) - - .define(TransactionPartitionVerificationEnableProp, BOOLEAN, Defaults.TRANSACTION_PARTITION_VERIFICATION_ENABLE, LOW, TransactionPartitionVerificationEnableDoc) - - .define(ProducerIdExpirationMsProp, INT, Defaults.PRODUCER_ID_EXPIRATION_MS, atLeast(1), LOW, ProducerIdExpirationMsDoc) + .define(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DOC) Review Comment: @OmniaGM Could you please take a look at this comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1559526460 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java: ## @@ -17,13 +17,12 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.transaction.TransactionLogConfigs; import java.util.Set; public class ProducerStateManagerConfig { -public static final String PRODUCER_ID_EXPIRATION_MS = "producer.id.expiration.ms"; -public static final String TRANSACTION_VERIFICATION_ENABLED = "transaction.partition.verification.enable"; -public static final Set RECONFIGURABLE_CONFIGS = Utils.mkSet(PRODUCER_ID_EXPIRATION_MS, TRANSACTION_VERIFICATION_ENABLED); +public static final Set RECONFIGURABLE_CONFIGS = Utils.mkSet(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG); Review Comment: Do we need to put those in `ProducerStateManagerConfig`? `DynamicBrokerConfig` is the class that has power to decide the reconfigurable configs (https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L1136). and so It seems to me those reconfigurable configs should be moved to `DynamicBrokerConfig`. ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1062,21 +1027,21 @@ object KafkaConfig { .define(CompressionTypeProp, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, in(BrokerCompressionType.names.asScala.toSeq:_*), HIGH, CompressionTypeDoc) /** * Transaction management configuration ***/ - .define(TransactionalIdExpirationMsProp, INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, TransactionalIdExpirationMsDoc) - .define(TransactionsMaxTimeoutMsProp, INT, Defaults.TRANSACTIONS_MAX_TIMEOUT_MS, atLeast(1), HIGH, TransactionsMaxTimeoutMsDoc) - .define(TransactionsTopicMinISRProp, INT, Defaults.TRANSACTIONS_TOPIC_MIN_ISR, atLeast(1), HIGH, TransactionsTopicMinISRDoc) - .define(TransactionsLoadBufferSizeProp, INT, Defaults.TRANSACTIONS_LOAD_BUFFER_SIZE, atLeast(1), HIGH, TransactionsLoadBufferSizeDoc) - .define(TransactionsTopicReplicationFactorProp, SHORT, Defaults.TRANSACTIONS_TOPIC_REPLICATION_FACTOR, atLeast(1), HIGH, TransactionsTopicReplicationFactorDoc) - .define(TransactionsTopicPartitionsProp, INT, Defaults.TRANSACTIONS_TOPIC_PARTITIONS, atLeast(1), HIGH, TransactionsTopicPartitionsDoc) - .define(TransactionsTopicSegmentBytesProp, INT, Defaults.TRANSACTIONS_TOPIC_SEGMENT_BYTES, atLeast(1), HIGH, TransactionsTopicSegmentBytesDoc) - .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_ABORT_TIMED_OUT_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc) - .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_REMOVE_EXPIRED_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc) - - .define(TransactionPartitionVerificationEnableProp, BOOLEAN, Defaults.TRANSACTION_PARTITION_VERIFICATION_ENABLE, LOW, TransactionPartitionVerificationEnableDoc) - - .define(ProducerIdExpirationMsProp, INT, Defaults.PRODUCER_ID_EXPIRATION_MS, atLeast(1), LOW, ProducerIdExpirationMsDoc) + .define(TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, INT, TransactionStateManagerConfigs.TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, TransactionStateManagerConfigs.TRANSACTIONAL_ID_EXPIRATION_MS_DOC) Review Comment: `TRANSACTIONS_MAX_TIMEOUT_MS_DEFAULT` should be replaced by `TRANSACTIONAL_ID_EXPIRATION_MS_DEFAULT` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556505842 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { // Log-level config default values -public static final int DEFAULT_NUM_PARTITIONS = 50; -public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024; -public static final short DEFAULT_REPLICATION_FACTOR = 3; -public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2; -public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = "transaction.state.log.num.partitions"; +public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number of partitions for the transaction topic (should not change after deployment)."; + +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = "transaction.state.log.segment.bytes"; +public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"; + +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = "transaction.state.log.replication.factor"; +public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the transaction topic (set higher to ensure availability). " + +"Internal topic creation will fail until the cluster size meets this replication factor requirement."; + +public static final String TRANSACTIONS_TOPIC_MIN_ISR_CONFIG = "transaction.state.log.min.isr"; +public static final int TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT = 2; +public static final String TRANSACTIONS_TOPIC_MIN_ISR_DOC = "Overridden " + ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) + " config for the transaction topic."; Review Comment: this is a good point. I rephrase it in a way that doesn't relay on referring to `TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556496340 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { Review Comment: > Why do we need this? we usually don't add it if the config class just to group the props, docs and default values. that is used to avoid creating instance, and that is a guideline of `Effective Java book` anyway, that is just a code style suggestion, so it is fine to ignore it :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556491908 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { // Log-level config default values -public static final int DEFAULT_NUM_PARTITIONS = 50; -public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024; -public static final short DEFAULT_REPLICATION_FACTOR = 3; -public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2; -public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = "transaction.state.log.num.partitions"; +public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number of partitions for the transaction topic (should not change after deployment)."; + +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = "transaction.state.log.segment.bytes"; +public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"; + +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = "transaction.state.log.replication.factor"; +public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the transaction topic (set higher to ensure availability). " + +"Internal topic creation will fail until the cluster size meets this replication factor requirement."; + +public static final String TRANSACTIONS_TOPIC_MIN_ISR_CONFIG = "transaction.state.log.min.isr"; +public static final int TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT = 2; +public static final String TRANSACTIONS_TOPIC_MIN_ISR_DOC = "Overridden " + ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) + " config for the transaction topic."; Review Comment: > If we want to remove the deps on TopicConfig we can hard code the value of TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG in doc but I personally would prefer that we keep the ref to the config. not sure whether we ought to highlight the "override". the other similar configs, for example `offsets.topic.replication.factor`, do not mention that "this" config overrides "that". Instead, it just says `The replication factor for the offsets topic ...`, and that is good enough to understand the purpose of config. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556482611 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { // Log-level config default values -public static final int DEFAULT_NUM_PARTITIONS = 50; -public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024; -public static final short DEFAULT_REPLICATION_FACTOR = 3; -public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2; -public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = "transaction.state.log.num.partitions"; +public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number of partitions for the transaction topic (should not change after deployment)."; + +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = "transaction.state.log.segment.bytes"; +public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"; + +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = "transaction.state.log.replication.factor"; +public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the transaction topic (set higher to ensure availability). " + +"Internal topic creation will fail until the cluster size meets this replication factor requirement."; + +public static final String TRANSACTIONS_TOPIC_MIN_ISR_CONFIG = "transaction.state.log.min.isr"; +public static final int TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT = 2; +public static final String TRANSACTIONS_TOPIC_MIN_ISR_DOC = "Overridden " + ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) + " config for the transaction topic."; + +public static final String TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG = "transaction.state.log.load.buffer.size"; +public static final int TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024; +public static final String TRANSACTIONS_LOAD_BUFFER_SIZE_DOC = "Batch size for reading from the transaction log segments when loading producer ids and transactions into the cache (soft-limit, overridden if records are too large)."; + +public static final String TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG = ProducerStateManagerConfig.TRANSACTION_VERIFICATION_ENABLED; +public static final boolean TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT = true; +public static final String TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC = "Enable verification that checks that the partition has been added to the transaction before writing transactional records to the partition"; + +public static final String PRODUCER_ID_EXPIRATION_MS_CONFIG = ProducerStateManagerConfig.PRODUCER_ID_EXPIRATION_MS; Review Comment: moved TRANSACTION_VERIFICATION_ENABLED and PRODUCER_ID_EXPIRATION_MS to TransactionLogConfig where we have the docs and default values -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556478763 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { Review Comment: Adding "s" to the name make sense but seems like we don't have one pattern in Kafka for example there are SaslConfigs and TopicConfig. I'll rename it but we properly need to update https://issues.apache.org/jira/browse/KAFKA-14524 to add some guidance for future refactors -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556481647 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { Review Comment: > add private constructor Why do we need this? we usually don't add it if the config class just to group the props, docs and default values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556478763 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { Review Comment: Adding "s" to the name make sense but seems like we don't have one pattern in Kafka for example there is SaslConfigs and TopicConfig. I'll rename it but we properly need to update https://issues.apache.org/jira/browse/KAFKA-14524 to add some guidance for future refactors -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556466418 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { // Log-level config default values -public static final int DEFAULT_NUM_PARTITIONS = 50; -public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024; -public static final short DEFAULT_REPLICATION_FACTOR = 3; -public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2; -public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = "transaction.state.log.num.partitions"; +public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number of partitions for the transaction topic (should not change after deployment)."; + +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = "transaction.state.log.segment.bytes"; +public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"; + +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = "transaction.state.log.replication.factor"; +public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the transaction topic (set higher to ensure availability). " + +"Internal topic creation will fail until the cluster size meets this replication factor requirement."; + +public static final String TRANSACTIONS_TOPIC_MIN_ISR_CONFIG = "transaction.state.log.min.isr"; +public static final int TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT = 2; +public static final String TRANSACTIONS_TOPIC_MIN_ISR_DOC = "Overridden " + ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) + " config for the transaction topic."; Review Comment: We maybe don't need `ServerTopicConfigSynonyms::serverSynonym` as it actually call `ServerTopicConfigSynonyms.sameName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)` which is same as if we just referred to `TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG` directly. If we want to remove the deps on `TopicConfig` we can hard code the value of `TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG` in doc but I personally would prefer that we keep the ref to the config. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1556404908 ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { Review Comment: In order to make it be a pure constants class, could you please add following changes? 1. rename it to `TransactionLogConfigs`? 2. add `final` 3. add private constructor ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { // Log-level config default values -public static final int DEFAULT_NUM_PARTITIONS = 50; -public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024; -public static final short DEFAULT_REPLICATION_FACTOR = 3; -public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2; -public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = "transaction.state.log.num.partitions"; +public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number of partitions for the transaction topic (should not change after deployment)."; + +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = "transaction.state.log.segment.bytes"; +public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"; + +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = "transaction.state.log.replication.factor"; +public static final short TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the transaction topic (set higher to ensure availability). " + +"Internal topic creation will fail until the cluster size meets this replication factor requirement."; + +public static final String TRANSACTIONS_TOPIC_MIN_ISR_CONFIG = "transaction.state.log.min.isr"; +public static final int TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT = 2; +public static final String TRANSACTIONS_TOPIC_MIN_ISR_DOC = "Overridden " + ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) + " config for the transaction topic."; Review Comment: Maybe we can revise the docs to avoid depending on those modules directly? ## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -16,11 +16,43 @@ */ package org.apache.kafka.coordinator.transaction; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.config.ServerTopicConfigSynonyms; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; + public class TransactionLogConfig { // Log-level config default values -public static final int DEFAULT_NUM_PARTITIONS = 50; -public static final int DEFAULT_SEGMENT_BYTES = 100 * 1024 * 1024; -public static final short DEFAULT_REPLICATION_FACTOR = 3; -public static final int DEFAULT_MIN_IN_SYNC_REPLICAS = 2; -public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_CONFIG = "transaction.state.log.num.partitions"; +public static final int TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT = 50; +public static final String TRANSACTIONS_TOPIC_PARTITIONS_DOC = "The number of partitions for the transaction topic (should not change after deployment)."; + +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG = "transaction.state.log.segment.bytes"; +public static final int TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; +public static final String TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"; + +public static final String TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG = "transaction.state.log.replication.factor"; +publ
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1555661766 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1062,21 +1027,21 @@ object KafkaConfig { .define(CompressionTypeProp, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, in(BrokerCompressionType.names.asScala.toSeq:_*), HIGH, CompressionTypeDoc) /** * Transaction management configuration ***/ - .define(TransactionalIdExpirationMsProp, INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, TransactionalIdExpirationMsDoc) - .define(TransactionsMaxTimeoutMsProp, INT, Defaults.TRANSACTIONS_MAX_TIMEOUT_MS, atLeast(1), HIGH, TransactionsMaxTimeoutMsDoc) - .define(TransactionsTopicMinISRProp, INT, Defaults.TRANSACTIONS_TOPIC_MIN_ISR, atLeast(1), HIGH, TransactionsTopicMinISRDoc) - .define(TransactionsLoadBufferSizeProp, INT, Defaults.TRANSACTIONS_LOAD_BUFFER_SIZE, atLeast(1), HIGH, TransactionsLoadBufferSizeDoc) - .define(TransactionsTopicReplicationFactorProp, SHORT, Defaults.TRANSACTIONS_TOPIC_REPLICATION_FACTOR, atLeast(1), HIGH, TransactionsTopicReplicationFactorDoc) - .define(TransactionsTopicPartitionsProp, INT, Defaults.TRANSACTIONS_TOPIC_PARTITIONS, atLeast(1), HIGH, TransactionsTopicPartitionsDoc) - .define(TransactionsTopicSegmentBytesProp, INT, Defaults.TRANSACTIONS_TOPIC_SEGMENT_BYTES, atLeast(1), HIGH, TransactionsTopicSegmentBytesDoc) - .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_ABORT_TIMED_OUT_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc) - .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_REMOVE_EXPIRED_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc) - - .define(TransactionPartitionVerificationEnableProp, BOOLEAN, Defaults.TRANSACTION_PARTITION_VERIFICATION_ENABLE, LOW, TransactionPartitionVerificationEnableDoc) - - .define(ProducerIdExpirationMsProp, INT, Defaults.PRODUCER_ID_EXPIRATION_MS, atLeast(1), LOW, ProducerIdExpirationMsDoc) + .define(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_DOC) Review Comment: Good call, pushed a refactor for this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1554282432 ## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ## @@ -1136,18 +1137,18 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi class DynamicProducerStateManagerConfig(val producerStateManagerConfig: ProducerStateManagerConfig) extends BrokerReconfigurable with Logging { def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { if (producerStateManagerConfig.producerIdExpirationMs != newConfig.producerIdExpirationMs) { - info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from ${producerStateManagerConfig.producerIdExpirationMs} to ${newConfig.producerIdExpirationMs}") + info(s"Reconfigure ${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} from ${producerStateManagerConfig.producerIdExpirationMs} to ${newConfig.producerIdExpirationMs}") producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs) } if (producerStateManagerConfig.transactionVerificationEnabled != newConfig.transactionPartitionVerificationEnable) { - info(s"Reconfigure ${KafkaConfig.TransactionPartitionVerificationEnableProp} from ${producerStateManagerConfig.transactionVerificationEnabled} to ${newConfig.transactionPartitionVerificationEnable}") + info(s"Reconfigure ${TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG} from ${producerStateManagerConfig.transactionVerificationEnabled} to ${newConfig.transactionPartitionVerificationEnable}") producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionPartitionVerificationEnable) } } def validateReconfiguration(newConfig: KafkaConfig): Unit = { if (newConfig.producerIdExpirationMs < 0) - throw new ConfigException(s"${KafkaConfig.ProducerIdExpirationMsProp} cannot be less than 0, current value is ${producerStateManagerConfig.producerIdExpirationMs}, and new value is ${newConfig.producerIdExpirationMs}") + throw new ConfigException(s"${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} cannot be less than 0, current value is ${producerStateManagerConfig.producerIdExpirationMs}, and new value is ${newConfig.producerIdExpirationMs}") Review Comment: maybe `ProducerStateManagerConfig.PRODUCER_ID_EXPIRATION_MS` is more suitable in this case because the code is used for logging `Producer` state. ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1062,21 +1027,21 @@ object KafkaConfig { .define(CompressionTypeProp, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, in(BrokerCompressionType.names.asScala.toSeq:_*), HIGH, CompressionTypeDoc) /** * Transaction management configuration ***/ - .define(TransactionalIdExpirationMsProp, INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, TransactionalIdExpirationMsDoc) - .define(TransactionsMaxTimeoutMsProp, INT, Defaults.TRANSACTIONS_MAX_TIMEOUT_MS, atLeast(1), HIGH, TransactionsMaxTimeoutMsDoc) - .define(TransactionsTopicMinISRProp, INT, Defaults.TRANSACTIONS_TOPIC_MIN_ISR, atLeast(1), HIGH, TransactionsTopicMinISRDoc) - .define(TransactionsLoadBufferSizeProp, INT, Defaults.TRANSACTIONS_LOAD_BUFFER_SIZE, atLeast(1), HIGH, TransactionsLoadBufferSizeDoc) - .define(TransactionsTopicReplicationFactorProp, SHORT, Defaults.TRANSACTIONS_TOPIC_REPLICATION_FACTOR, atLeast(1), HIGH, TransactionsTopicReplicationFactorDoc) - .define(TransactionsTopicPartitionsProp, INT, Defaults.TRANSACTIONS_TOPIC_PARTITIONS, atLeast(1), HIGH, TransactionsTopicPartitionsDoc) - .define(TransactionsTopicSegmentBytesProp, INT, Defaults.TRANSACTIONS_TOPIC_SEGMENT_BYTES, atLeast(1), HIGH, TransactionsTopicSegmentBytesDoc) - .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_ABORT_TIMED_OUT_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc) - .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_REMOVE_EXPIRED_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc) - - .define(TransactionPartitionVerificationEnableProp, BOOLEAN, Defaults.TRANSACTION_PARTITION_VERIFICATION_ENABLE, LOW, TransactionPartitionVerificationEnableDoc) - - .define(ProducerIdExpirationMsProp, INT, Defaults.PRODUCER_ID_EXPIRATION_MS, atLeast(1), LOW, ProducerIdExpirationMsDoc) + .define(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_DOC) Review Comment: Should we do similar refactor for `Defaults`? -- This is an automated message from the Apache Git Service. To respond to the messa
[PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM opened a new pull request, #15670: URL: https://github.com/apache/kafka/pull/15670 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org