chia7712 commented on code in PR #16665:
URL: https://github.com/apache/kafka/pull/16665#discussion_r1709903239


##########
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##########
@@ -61,15 +62,67 @@ public final class TransactionLogConfigs {
     public static final int PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT = 
600000;
     public static final String PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC = 
"The interval at which to remove producer IDs that have expired due to 
<code>producer.id.expiration.ms</code> passing.";
     public static final ConfigDef CONFIG_DEF =  new ConfigDef()
-            .define(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1), 
HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DOC)
-            
.define(TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT, 
TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, 
TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DOC)
-            
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 
SHORT, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, 
atLeast(1), HIGH, 
TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC)
-            
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT, 
TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, 
TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DOC)
-            
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT, 
TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), 
HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC)
+            .define(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, INT, 
TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1), HIGH, 
TRANSACTIONS_TOPIC_MIN_ISR_DOC)
+            .define(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT, 
TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, 
TRANSACTIONS_LOAD_BUFFER_SIZE_DOC)
+            .define(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, 
TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, 
TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC)
+            .define(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT, 
TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, 
TRANSACTIONS_TOPIC_PARTITIONS_DOC)
+            .define(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT, 
TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, 
TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC)
 
-            
.define(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, 
BOOLEAN, 
TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW, 
TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC)
+            .define(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, BOOLEAN, 
TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW, 
TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC)
 
-            .define(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, 
INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW, 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DOC)
+            .define(PRODUCER_ID_EXPIRATION_MS_CONFIG, INT, 
PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW, 
PRODUCER_ID_EXPIRATION_MS_DOC)
             // Configuration for testing only as default value should be 
sufficient for typical usage
-            
.defineInternal(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG,
 INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, 
atLeast(1), LOW, 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC);
+            .defineInternal(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, 
INT, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, 
PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC);
+
+    private AbstractConfig config;
+    private int transactionTopicMinISR;
+    private int transactionsLoadBufferSize;
+    private short transactionTopicReplicationFactor;
+    private int transactionTopicPartitions;
+    private int transactionTopicSegmentBytes;
+    private int producerIdExpirationCheckIntervalMs;
+
+    public TransactionLogConfig(AbstractConfig config) {
+        this.config = config;
+        this.transactionTopicMinISR = 
config.getInt(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG);
+        this.transactionsLoadBufferSize = 
config.getInt(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG);
+        this.transactionTopicReplicationFactor = 
config.getShort(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG);
+        this.transactionTopicPartitions = 
config.getInt(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG);
+        this.transactionTopicSegmentBytes = 
config.getInt(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG);
+        this.producerIdExpirationCheckIntervalMs = 
config.getInt(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG);
+    }
+
+    public int transactionTopicMinISR() {
+        return transactionTopicMinISR;
+    }
+
+    public int transactionsLoadBufferSize() {
+        return transactionsLoadBufferSize;
+    }
+
+    public short transactionTopicReplicationFactor() {
+        return transactionTopicReplicationFactor;
+    }
+
+    public int transactionTopicPartitions() {
+        return transactionTopicPartitions;
+    }
+
+    public int transactionTopicSegmentBytes() {
+        return transactionTopicSegmentBytes;
+    }
+
+    public Boolean transactionPartitionVerificationEnable() {
+        return 
config.getBoolean(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);

Review Comment:
   we had a discussion for that before 
(https://github.com/apache/kafka/pull/16458#discussion_r1662130270)
   
   `AbstractConfig` is read-only, but the sub class `KafkaConfig` can update 
inner configs dynamically (see 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala#L192).
 Hence, the passed `AbstractConfig` always have "up-to-date configs"
   
   There is a jira which tries to refactor it 
(https://issues.apache.org/jira/browse/KAFKA-17001), but I haven't worked on it 
because that will introduce major changes to scala code.
   
   In short, the basic rules for now are shown below.
   
   1. the non-dynamic config will be evaluated in constructor.
   2. the dynamic config will be  evaluated in getter (as `AbstractConfig` 
always have latest configs)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to