[ 
https://issues.apache.org/jira/browse/KAFKA-14485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941224#comment-17941224
 ] 

Chia-Ping Tsai commented on KAFKA-14485:
----------------------------------------

 
{code:java}
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 697d6a3014..ed8140e596 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, 
TransactionResult}
 import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time}
-import org.apache.kafka.coordinator.transaction.ProducerIdManager
+import org.apache.kafka.coordinator.transaction.{ProducerIdManager, 
TransactionLogConfig}
 import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
 import org.apache.kafka.server.util.Scheduler
@@ -46,13 +46,14 @@ object TransactionCoordinator {
             metadataCache: MetadataCache,
             time: Time): TransactionCoordinator = {
 
+    val transactionLogConfig = new TransactionLogConfig(config)
     val txnConfig = 
TransactionConfig(config.transactionStateManagerConfig.transactionalIdExpirationMs,
       config.transactionStateManagerConfig.transactionMaxTimeoutMs,
-      config.transactionLogConfig.transactionTopicPartitions,
-      config.transactionLogConfig.transactionTopicReplicationFactor,
-      config.transactionLogConfig.transactionTopicSegmentBytes,
-      config.transactionLogConfig.transactionLoadBufferSize,
-      config.transactionLogConfig.transactionTopicMinISR,
+      transactionLogConfig.transactionTopicPartitions,
+      transactionLogConfig.transactionTopicReplicationFactor,
+      transactionLogConfig.transactionTopicSegmentBytes,
+      transactionLogConfig.transactionLoadBufferSize,
+      transactionLogConfig.transactionTopicMinISR,
       
config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
       
config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
       config.transactionStateManagerConfig.transaction2PCEnabled,
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 2bafe73528..b33657ce97 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -29,6 +29,7 @@ import kafka.utils.{CoreUtils, Logging, Pool}
 import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, 
Uuid}
 import org.apache.kafka.common.utils.{Exit, KafkaThread, Time, Utils}
 import org.apache.kafka.common.errors.{InconsistentTopicIdException, 
KafkaStorageException, LogDirNotFoundException}
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 
 import scala.jdk.CollectionConverters._
 import scala.collection._
@@ -1549,6 +1550,8 @@ object LogManager {
 
     val cleanerConfig = LogCleaner.cleanerConfig(config)
 
+    val transactionLogConfig = new TransactionLogConfig(config)
+
     new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
       initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
       configRepository = configRepository,
@@ -1560,8 +1563,8 @@ object LogManager {
       flushStartOffsetCheckpointMs = 
config.logFlushStartOffsetCheckpointIntervalMs,
       retentionCheckMs = config.logCleanupIntervalMs,
       maxTransactionTimeoutMs = 
config.transactionStateManagerConfig.transactionMaxTimeoutMs,
-      producerStateManagerConfig = new 
ProducerStateManagerConfig(config.transactionLogConfig.producerIdExpirationMs, 
config.transactionLogConfig.transactionPartitionVerificationEnable),
-      producerIdExpirationCheckIntervalMs = 
config.transactionLogConfig.producerIdExpirationCheckIntervalMs,
+      producerStateManagerConfig = new 
ProducerStateManagerConfig(transactionLogConfig.producerIdExpirationMs, 
transactionLogConfig.transactionPartitionVerificationEnable),
+      producerIdExpirationCheckIntervalMs = 
transactionLogConfig.producerIdExpirationCheckIntervalMs,
       scheduler = kafkaScheduler,
       brokerTopicStats = brokerTopicStats,
       logDirFailureChannel = logDirFailureChannel,
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala 
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 85308473bf..a2c2bd4d80 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{CreateTopicsRequest, RequestContext, 
RequestHeader}
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.coordinator.share.ShareCoordinator
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 
 import scala.collection.{Map, Seq, Set, mutable}
@@ -189,10 +190,11 @@ class DefaultAutoTopicCreationManager(
           
.setReplicationFactor(config.groupCoordinatorConfig.offsetsTopicReplicationFactor)
           
.setConfigs(convertToTopicConfigCollections(groupCoordinator.groupMetadataTopicConfigs))
       case TRANSACTION_STATE_TOPIC_NAME =>
+        val transactionLogConfig = new TransactionLogConfig(config)
         new CreatableTopic()
           .setName(topic)
-          
.setNumPartitions(config.transactionLogConfig.transactionTopicPartitions)
-          
.setReplicationFactor(config.transactionLogConfig.transactionTopicReplicationFactor)
+          .setNumPartitions(transactionLogConfig.transactionTopicPartitions)
+          
.setReplicationFactor(transactionLogConfig.transactionTopicReplicationFactor)
           .setConfigs(convertToTopicConfigCollections(
             txnCoordinator.transactionTopicConfigs))
       case SHARE_GROUP_STATE_TOPIC_NAME =>
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2b8be8518b..87bfd9cd21 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.coordinator.group.Group.GroupType
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, 
TransactionLogConfig, TransactionStateManagerConfig}
+import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, 
TransactionStateManagerConfig}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.authorizer.AuthorizerUtils
@@ -204,7 +204,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   private val _shareCoordinatorConfig = new ShareCoordinatorConfig(this)
   def shareCoordinatorConfig: ShareCoordinatorConfig = _shareCoordinatorConfig
 
-  override val transactionLogConfig = new TransactionLogConfig(this)
   private val _transactionStateManagerConfig = new 
TransactionStateManagerConfig(this)
   private val _addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(this)
   def transactionStateManagerConfig: TransactionStateManagerConfig = 
_transactionStateManagerConfig
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6ff88869d9..751f0cad59 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -47,6 +47,7 @@ import 
org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.{Exit, Time, Utils}
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.MetadataCache
@@ -1037,9 +1038,10 @@ class ReplicaManager(val config: KafkaConfig,
     callback: ((Map[TopicPartition, Errors], Map[TopicPartition, 
VerificationGuard])) => Unit,
     transactionSupportedOperation: TransactionSupportedOperation
   ): Unit = {
+    val transactionLogConfig = new TransactionLogConfig(config)
     // Skip verification if the request is not transactional or transaction 
verification is disabled.
     if (transactionalId == null ||
-      (!config.transactionLogConfig.transactionPartitionVerificationEnable && 
!transactionSupportedOperation.supportsEpochBump)
+      (!transactionLogConfig.transactionPartitionVerificationEnable && 
!transactionSupportedOperation.supportsEpochBump)
       || addPartitionsToTxnManager.isEmpty
     ) {
       callback((Map.empty[TopicPartition, Errors], Map.empty[TopicPartition, 
VerificationGuard]))
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index ead385e744..de8f16e1e5 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.coordinator.share.ShareCoordinator
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.image.loader.LoaderManifest
 import org.apache.kafka.image.publisher.MetadataPublisher
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
@@ -331,9 +332,10 @@ class BrokerMetadataPublisher(
       case t: Throwable => fatalFaultHandler.handleFault("Error starting 
GroupCoordinator", t)
     }
     try {
+      val transactionLogConfig = new TransactionLogConfig(config)
       // Start the transaction coordinator.
       txnCoordinator.startup(() => metadataCache.numPartitions(
-        
Topic.TRANSACTION_STATE_TOPIC_NAME).orElse(config.transactionLogConfig.transactionTopicPartitions))
+        
Topic.TRANSACTION_STATE_TOPIC_NAME).orElse(transactionLogConfig.transactionTopicPartitions))
     } catch {
       case t: Throwable => fatalFaultHandler.handleFault("Error starting 
TransactionCoordinator", t)
     }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index e014f255d4..f1040bab42 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2523,7 +2523,8 @@ class ReplicaManagerTest {
       val props = new Properties()
       
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
 "true")
       config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
-      TestUtils.waitUntilTrue(() => 
config.transactionLogConfig.transactionPartitionVerificationEnable, "Config did 
not dynamically update.")
+      val transactionLogConfig = new TransactionLogConfig(config)
+      TestUtils.waitUntilTrue(() => 
transactionLogConfig.transactionPartitionVerificationEnable, "Config did not 
dynamically update.")
 
       // Try to append more records. We don't need to send a request since the 
transaction is already ongoing.
       val moreTransactionalRecords = 
MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, 
producerEpoch, sequence + 1,
@@ -2575,7 +2576,8 @@ class ReplicaManagerTest {
       val props = new Properties()
       
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
 "false")
       config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
-      TestUtils.waitUntilTrue(() => 
!config.transactionLogConfig.transactionPartitionVerificationEnable, "Config 
did not dynamically update.")
+      val transactionLogConfig = new TransactionLogConfig(config)
+      TestUtils.waitUntilTrue(() => 
!transactionLogConfig.transactionPartitionVerificationEnable, "Config did not 
dynamically update.")
 
       // Confirm we did not write to the log and instead returned error.
       val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java 
b/server-common/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
similarity index 89%
rename from 
server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
rename to 
server-common/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
index 2b3fe9dcf3..864f5c2e24 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.server.config;
 
+import org.apache.kafka.common.config.AbstractConfig;
+
 import java.util.Set;
 
 /**
@@ -27,13 +29,12 @@ import java.util.Set;
  * The reconfiguration process follows three steps:
  * <ol>
  *   <li>Determining which configurations can be dynamically updated via 
{@link #reconfigurableConfigs()}</li>
- *   <li>Validating the new configuration before applying it via {@link 
#validateReconfiguration(AbstractKafkaConfig)}</li>
- *   <li>Applying the new configuration via {@link 
#reconfigure(AbstractKafkaConfig, AbstractKafkaConfig)}</li>
+ *   <li>Validating the new configuration before applying it via {@link 
#validateReconfiguration(AbstractConfig)}</li>
+ *   <li>Applying the new configuration via {@link 
#reconfigure(AbstractConfig, AbstractConfig)}</li>
  * </ol>
  * <strong>Note: Since Kafka is eliminating Scala, developers should implement 
this interface instead of {@link kafka.server.BrokerReconfigurable}</strong>
  *
  *
- * @see AbstractKafkaConfig
  */
 public interface BrokerReconfigurable {
     /**
@@ -55,7 +56,7 @@ public interface BrokerReconfigurable {
      *
      * @param newConfig the new configuration to validate
      */
-    void validateReconfiguration(AbstractKafkaConfig newConfig);
+    void validateReconfiguration(AbstractConfig newConfig);
 
     /**
      * Applies the new configuration.
@@ -65,5 +66,5 @@ public interface BrokerReconfigurable {
      * @param oldConfig the previous configuration
      * @param newConfig the new configuration to apply
      */
-    void reconfigure(AbstractKafkaConfig oldConfig, AbstractKafkaConfig 
newConfig);
+    void reconfigure(AbstractConfig oldConfig, AbstractConfig newConfig);
 }
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index fc6906b96d..87bf18a412 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -83,6 +83,4 @@ public abstract class AbstractKafkaConfig extends 
AbstractConfig {
     public int backgroundThreads() {
         return getInt(ServerConfigs.BACKGROUND_THREADS_CONFIG);
     }
-
-    public abstract TransactionLogConfig transactionLogConfig();
 }
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
 
b/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
index 4158194f00..a09e12d951 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.server.config;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
 import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
@@ -39,16 +40,16 @@ public class DynamicProducerStateManagerConfig implements 
BrokerReconfigurable {
     }
 
     @Override
-    public void validateReconfiguration(AbstractKafkaConfig newConfig) {
-        TransactionLogConfig transactionLogConfig = 
newConfig.transactionLogConfig();
+    public void validateReconfiguration(AbstractConfig newConfig) {
+        TransactionLogConfig transactionLogConfig = new 
TransactionLogConfig(newConfig);
         if (transactionLogConfig.producerIdExpirationMs() < 0)
             throw new 
ConfigException(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG + "cannot 
be less than 0, current value is " +
                                       
producerStateManagerConfig.producerIdExpirationMs() + ", and new value is " + 
transactionLogConfig.producerIdExpirationMs());
     }
 
     @Override
-    public void reconfigure(AbstractKafkaConfig oldConfig, AbstractKafkaConfig 
newConfig) {
-        TransactionLogConfig transactionLogConfig = 
newConfig.transactionLogConfig();
+    public void reconfigure(AbstractConfig oldConfig, AbstractConfig 
newConfig) {
+        TransactionLogConfig transactionLogConfig = new 
TransactionLogConfig(newConfig);
         if (producerStateManagerConfig.producerIdExpirationMs() != 
transactionLogConfig.producerIdExpirationMs()) {
             log.info("Reconfigure {} from {} to {}",
                 TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, {code}
I don't run the tests for above patch, but it can de-couple the 
BrokerReconfigurable and AbstractKafkaConfig

 

> Move LogCleaner to storage module
> ---------------------------------
>
>                 Key: KAFKA-14485
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14485
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Ismael Juma
>            Assignee: Dmitry Werner
>            Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to