chia7712 commented on code in PR #16665: URL: https://github.com/apache/kafka/pull/16665#discussion_r1709903239
########## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ########## @@ -61,15 +62,67 @@ public final class TransactionLogConfigs { public static final int PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT = 600000; public static final String PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC = "The interval at which to remove producer IDs that have expired due to <code>producer.id.expiration.ms</code> passing."; public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DOC) - .define(TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DOC) - .define(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC) - .define(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DOC) - .define(TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC) + .define(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, INT, TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1), HIGH, TRANSACTIONS_TOPIC_MIN_ISR_DOC) + .define(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT, TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, TRANSACTIONS_LOAD_BUFFER_SIZE_DOC) + .define(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC) + .define(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT, TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, TRANSACTIONS_TOPIC_PARTITIONS_DOC) + .define(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT, TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC) - .define(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, BOOLEAN, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC) + .define(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, BOOLEAN, TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW, TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC) - .define(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DOC) + .define(PRODUCER_ID_EXPIRATION_MS_CONFIG, INT, PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW, PRODUCER_ID_EXPIRATION_MS_DOC) // Configuration for testing only as default value should be sufficient for typical usage - .defineInternal(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC); + .defineInternal(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, INT, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC); + + private AbstractConfig config; + private int transactionTopicMinISR; + private int transactionsLoadBufferSize; + private short transactionTopicReplicationFactor; + private int transactionTopicPartitions; + private int transactionTopicSegmentBytes; + private int producerIdExpirationCheckIntervalMs; + + public TransactionLogConfig(AbstractConfig config) { + this.config = config; + this.transactionTopicMinISR = config.getInt(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG); + this.transactionsLoadBufferSize = config.getInt(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG); + this.transactionTopicReplicationFactor = config.getShort(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG); + this.transactionTopicPartitions = config.getInt(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG); + this.transactionTopicSegmentBytes = config.getInt(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG); + this.producerIdExpirationCheckIntervalMs = config.getInt(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG); + } + + public int transactionTopicMinISR() { + return transactionTopicMinISR; + } + + public int transactionsLoadBufferSize() { + return transactionsLoadBufferSize; + } + + public short transactionTopicReplicationFactor() { + return transactionTopicReplicationFactor; + } + + public int transactionTopicPartitions() { + return transactionTopicPartitions; + } + + public int transactionTopicSegmentBytes() { + return transactionTopicSegmentBytes; + } + + public Boolean transactionPartitionVerificationEnable() { + return config.getBoolean(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG); Review Comment: we had a discussion for that before (https://github.com/apache/kafka/pull/16458#discussion_r1662130270) `AbstractConfig` is read-only, but the sub class `KafkaConfig` can update inner configs dynamically (see https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala#L192). Hence, the passed `AbstractConfig` always have "up-to-date configs" There is a jira which tries to refactor it (https://issues.apache.org/jira/browse/KAFKA-17001), but I haven't worked on it because that will introduce major changes to scala code. In short, the basic rules for now are shown below. 1. the non-dynamic config will be evaluated in constructor. 2. the dynamic config will be evaluated in getter (as `AbstractConfig` always have latest configs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org