[
https://issues.apache.org/jira/browse/KAFKA-14485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941224#comment-17941224
]
Chia-Ping Tsai commented on KAFKA-14485:
----------------------------------------
{code:java}
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 697d6a3014..ed8140e596 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse,
TransactionResult}
import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time}
-import org.apache.kafka.coordinator.transaction.ProducerIdManager
+import org.apache.kafka.coordinator.transaction.{ProducerIdManager,
TransactionLogConfig}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.util.Scheduler
@@ -46,13 +46,14 @@ object TransactionCoordinator {
metadataCache: MetadataCache,
time: Time): TransactionCoordinator = {
+ val transactionLogConfig = new TransactionLogConfig(config)
val txnConfig =
TransactionConfig(config.transactionStateManagerConfig.transactionalIdExpirationMs,
config.transactionStateManagerConfig.transactionMaxTimeoutMs,
- config.transactionLogConfig.transactionTopicPartitions,
- config.transactionLogConfig.transactionTopicReplicationFactor,
- config.transactionLogConfig.transactionTopicSegmentBytes,
- config.transactionLogConfig.transactionLoadBufferSize,
- config.transactionLogConfig.transactionTopicMinISR,
+ transactionLogConfig.transactionTopicPartitions,
+ transactionLogConfig.transactionTopicReplicationFactor,
+ transactionLogConfig.transactionTopicSegmentBytes,
+ transactionLogConfig.transactionLoadBufferSize,
+ transactionLogConfig.transactionTopicMinISR,
config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
config.transactionStateManagerConfig.transaction2PCEnabled,
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index 2bafe73528..b33657ce97 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -29,6 +29,7 @@ import kafka.utils.{CoreUtils, Logging, Pool}
import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition,
Uuid}
import org.apache.kafka.common.utils.{Exit, KafkaThread, Time, Utils}
import org.apache.kafka.common.errors.{InconsistentTopicIdException,
KafkaStorageException, LogDirNotFoundException}
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import scala.jdk.CollectionConverters._
import scala.collection._
@@ -1549,6 +1550,8 @@ object LogManager {
val cleanerConfig = LogCleaner.cleanerConfig(config)
+ val transactionLogConfig = new TransactionLogConfig(config)
+
new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
configRepository = configRepository,
@@ -1560,8 +1563,8 @@ object LogManager {
flushStartOffsetCheckpointMs =
config.logFlushStartOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
maxTransactionTimeoutMs =
config.transactionStateManagerConfig.transactionMaxTimeoutMs,
- producerStateManagerConfig = new
ProducerStateManagerConfig(config.transactionLogConfig.producerIdExpirationMs,
config.transactionLogConfig.transactionPartitionVerificationEnable),
- producerIdExpirationCheckIntervalMs =
config.transactionLogConfig.producerIdExpirationCheckIntervalMs,
+ producerStateManagerConfig = new
ProducerStateManagerConfig(transactionLogConfig.producerIdExpirationMs,
transactionLogConfig.transactionPartitionVerificationEnable),
+ producerIdExpirationCheckIntervalMs =
transactionLogConfig.producerIdExpirationCheckIntervalMs,
scheduler = kafkaScheduler,
brokerTopicStats = brokerTopicStats,
logDirFailureChannel = logDirFailureChannel,
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 85308473bf..a2c2bd4d80 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{CreateTopicsRequest, RequestContext,
RequestHeader}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import scala.collection.{Map, Seq, Set, mutable}
@@ -189,10 +190,11 @@ class DefaultAutoTopicCreationManager(
.setReplicationFactor(config.groupCoordinatorConfig.offsetsTopicReplicationFactor)
.setConfigs(convertToTopicConfigCollections(groupCoordinator.groupMetadataTopicConfigs))
case TRANSACTION_STATE_TOPIC_NAME =>
+ val transactionLogConfig = new TransactionLogConfig(config)
new CreatableTopic()
.setName(topic)
-
.setNumPartitions(config.transactionLogConfig.transactionTopicPartitions)
-
.setReplicationFactor(config.transactionLogConfig.transactionTopicReplicationFactor)
+ .setNumPartitions(transactionLogConfig.transactionTopicPartitions)
+
.setReplicationFactor(transactionLogConfig.transactionTopicReplicationFactor)
.setConfigs(convertToTopicConfigCollections(
txnCoordinator.transactionTopicConfigs))
case SHARE_GROUP_STATE_TOPIC_NAME =>
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2b8be8518b..87bfd9cd21 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
-import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig,
TransactionLogConfig, TransactionStateManagerConfig}
+import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig,
TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AuthorizerUtils
@@ -204,7 +204,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
private val _shareCoordinatorConfig = new ShareCoordinatorConfig(this)
def shareCoordinatorConfig: ShareCoordinatorConfig = _shareCoordinatorConfig
- override val transactionLogConfig = new TransactionLogConfig(this)
private val _transactionStateManagerConfig = new
TransactionStateManagerConfig(this)
private val _addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(this)
def transactionStateManagerConfig: TransactionStateManagerConfig =
_transactionStateManagerConfig
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6ff88869d9..751f0cad59 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -47,6 +47,7 @@ import
org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{Exit, Time, Utils}
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.MetadataCache
@@ -1037,9 +1038,10 @@ class ReplicaManager(val config: KafkaConfig,
callback: ((Map[TopicPartition, Errors], Map[TopicPartition,
VerificationGuard])) => Unit,
transactionSupportedOperation: TransactionSupportedOperation
): Unit = {
+ val transactionLogConfig = new TransactionLogConfig(config)
// Skip verification if the request is not transactional or transaction
verification is disabled.
if (transactionalId == null ||
- (!config.transactionLogConfig.transactionPartitionVerificationEnable &&
!transactionSupportedOperation.supportsEpochBump)
+ (!transactionLogConfig.transactionPartitionVerificationEnable &&
!transactionSupportedOperation.supportsEpochBump)
|| addPartitionsToTxnManager.isEmpty
) {
callback((Map.empty[TopicPartition, Errors], Map.empty[TopicPartition,
VerificationGuard]))
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index ead385e744..de8f16e1e5 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
@@ -331,9 +332,10 @@ class BrokerMetadataPublisher(
case t: Throwable => fatalFaultHandler.handleFault("Error starting
GroupCoordinator", t)
}
try {
+ val transactionLogConfig = new TransactionLogConfig(config)
// Start the transaction coordinator.
txnCoordinator.startup(() => metadataCache.numPartitions(
-
Topic.TRANSACTION_STATE_TOPIC_NAME).orElse(config.transactionLogConfig.transactionTopicPartitions))
+
Topic.TRANSACTION_STATE_TOPIC_NAME).orElse(transactionLogConfig.transactionTopicPartitions))
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error starting
TransactionCoordinator", t)
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index e014f255d4..f1040bab42 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2523,7 +2523,8 @@ class ReplicaManagerTest {
val props = new Properties()
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
"true")
config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
- TestUtils.waitUntilTrue(() =>
config.transactionLogConfig.transactionPartitionVerificationEnable, "Config did
not dynamically update.")
+ val transactionLogConfig = new TransactionLogConfig(config)
+ TestUtils.waitUntilTrue(() =>
transactionLogConfig.transactionPartitionVerificationEnable, "Config did not
dynamically update.")
// Try to append more records. We don't need to send a request since the
transaction is already ongoing.
val moreTransactionalRecords =
MemoryRecords.withTransactionalRecords(Compression.NONE, producerId,
producerEpoch, sequence + 1,
@@ -2575,7 +2576,8 @@ class ReplicaManagerTest {
val props = new Properties()
props.put(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
"false")
config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
- TestUtils.waitUntilTrue(() =>
!config.transactionLogConfig.transactionPartitionVerificationEnable, "Config
did not dynamically update.")
+ val transactionLogConfig = new TransactionLogConfig(config)
+ TestUtils.waitUntilTrue(() =>
!transactionLogConfig.transactionPartitionVerificationEnable, "Config did not
dynamically update.")
// Confirm we did not write to the log and instead returned error.
val callback: AddPartitionsToTxnManager.AppendCallback =
appendCallback.getValue
diff --git
a/server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
b/server-common/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
similarity index 89%
rename from
server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
rename to
server-common/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
index 2b3fe9dcf3..864f5c2e24 100644
---
a/server/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
+++
b/server-common/src/main/java/org/apache/kafka/server/config/BrokerReconfigurable.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.server.config;
+import org.apache.kafka.common.config.AbstractConfig;
+
import java.util.Set;
/**
@@ -27,13 +29,12 @@ import java.util.Set;
* The reconfiguration process follows three steps:
* <ol>
* <li>Determining which configurations can be dynamically updated via
{@link #reconfigurableConfigs()}</li>
- * <li>Validating the new configuration before applying it via {@link
#validateReconfiguration(AbstractKafkaConfig)}</li>
- * <li>Applying the new configuration via {@link
#reconfigure(AbstractKafkaConfig, AbstractKafkaConfig)}</li>
+ * <li>Validating the new configuration before applying it via {@link
#validateReconfiguration(AbstractConfig)}</li>
+ * <li>Applying the new configuration via {@link
#reconfigure(AbstractConfig, AbstractConfig)}</li>
* </ol>
* <strong>Note: Since Kafka is eliminating Scala, developers should implement
this interface instead of {@link kafka.server.BrokerReconfigurable}</strong>
*
*
- * @see AbstractKafkaConfig
*/
public interface BrokerReconfigurable {
/**
@@ -55,7 +56,7 @@ public interface BrokerReconfigurable {
*
* @param newConfig the new configuration to validate
*/
- void validateReconfiguration(AbstractKafkaConfig newConfig);
+ void validateReconfiguration(AbstractConfig newConfig);
/**
* Applies the new configuration.
@@ -65,5 +66,5 @@ public interface BrokerReconfigurable {
* @param oldConfig the previous configuration
* @param newConfig the new configuration to apply
*/
- void reconfigure(AbstractKafkaConfig oldConfig, AbstractKafkaConfig
newConfig);
+ void reconfigure(AbstractConfig oldConfig, AbstractConfig newConfig);
}
diff --git
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index fc6906b96d..87bf18a412 100644
---
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -83,6 +83,4 @@ public abstract class AbstractKafkaConfig extends
AbstractConfig {
public int backgroundThreads() {
return getInt(ServerConfigs.BACKGROUND_THREADS_CONFIG);
}
-
- public abstract TransactionLogConfig transactionLogConfig();
}
diff --git
a/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
b/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
index 4158194f00..a09e12d951 100644
---
a/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.server.config;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
@@ -39,16 +40,16 @@ public class DynamicProducerStateManagerConfig implements
BrokerReconfigurable {
}
@Override
- public void validateReconfiguration(AbstractKafkaConfig newConfig) {
- TransactionLogConfig transactionLogConfig =
newConfig.transactionLogConfig();
+ public void validateReconfiguration(AbstractConfig newConfig) {
+ TransactionLogConfig transactionLogConfig = new
TransactionLogConfig(newConfig);
if (transactionLogConfig.producerIdExpirationMs() < 0)
throw new
ConfigException(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG + "cannot
be less than 0, current value is " +
producerStateManagerConfig.producerIdExpirationMs() + ", and new value is " +
transactionLogConfig.producerIdExpirationMs());
}
@Override
- public void reconfigure(AbstractKafkaConfig oldConfig, AbstractKafkaConfig
newConfig) {
- TransactionLogConfig transactionLogConfig =
newConfig.transactionLogConfig();
+ public void reconfigure(AbstractConfig oldConfig, AbstractConfig
newConfig) {
+ TransactionLogConfig transactionLogConfig = new
TransactionLogConfig(newConfig);
if (producerStateManagerConfig.producerIdExpirationMs() !=
transactionLogConfig.producerIdExpirationMs()) {
log.info("Reconfigure {} from {} to {}",
TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, {code}
I don't run the tests for above patch, but it can de-couple the
BrokerReconfigurable and AbstractKafkaConfig
> Move LogCleaner to storage module
> ---------------------------------
>
> Key: KAFKA-14485
> URL: https://issues.apache.org/jira/browse/KAFKA-14485
> Project: Kafka
> Issue Type: Sub-task
> Reporter: Ismael Juma
> Assignee: Dmitry Werner
> Priority: Major
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)