This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new dba789dc93d KAFKA-15853: Move OffsetConfig to group-coordinator module (#15161) dba789dc93d is described below commit dba789dc93d8dee2965ea41df49f0c981f5dfa6f Author: Omnia Ibrahim <o.g.h.ibra...@gmail.com> AuthorDate: Thu Jan 11 09:19:42 2024 +0000 KAFKA-15853: Move OffsetConfig to group-coordinator module (#15161) Reviewers: Mickael Maison <mickael.mai...@gmail.com>, David Jacot <dja...@confluent.io>, Nikolay <nizhi...@apache.org> --- .../kafka/coordinator/group/GroupCoordinator.scala | 23 ++--- .../coordinator/group/GroupMetadataManager.scala | 1 + .../kafka/coordinator/group/OffsetConfig.scala | 66 --------------- core/src/main/scala/kafka/server/KafkaConfig.scala | 20 ++--- ...ListenersWithSameSecurityProtocolBaseTest.scala | 5 +- .../coordinator/group/GroupCoordinatorTest.scala | 3 +- .../group/GroupMetadataManagerTest.scala | 23 ++--- .../kafka/coordinator/group/OffsetConfig.java | 97 ++++++++++++++++++++++ 8 files changed, 136 insertions(+), 102 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index aba205372b6..cff348228d6 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -32,6 +32,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests._ import org.apache.kafka.common.utils.Time +import org.apache.kafka.coordinator.group.OffsetConfig import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.storage.internals.log.VerificationGuard @@ -1751,17 +1752,17 @@ object GroupCoordinator { GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, time, metrics) } - private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig( - maxMetadataSize = config.offsetMetadataMaxSize, - loadBufferSize = config.offsetsLoadBufferSize, - offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L, - offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, - offsetsTopicNumPartitions = config.offsetsTopicPartitions, - offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes, - offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, - offsetsTopicCompressionType = config.offsetsTopicCompressionType, - offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, - offsetCommitRequiredAcks = config.offsetCommitRequiredAcks + private[group] def offsetConfig(config: KafkaConfig) = new OffsetConfig( + config.offsetMetadataMaxSize, + config.offsetsLoadBufferSize, + config.offsetsRetentionMinutes * 60L * 1000L, + config.offsetsRetentionCheckIntervalMs, + config.offsetsTopicPartitions, + config.offsetsTopicSegmentBytes, + config.offsetsTopicReplicationFactor, + config.offsetsTopicCompressionType, + config.offsetCommitTimeoutMs, + config.offsetCommitRequiredAcks ) private[group] def apply( diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 31af1d81a22..3177576eed1 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -43,6 +43,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse} import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{KafkaException, MessageFormatter, TopicIdPartition, TopicPartition} +import org.apache.kafka.coordinator.group.OffsetConfig import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0} diff --git a/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala b/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala deleted file mode 100644 index 189e8e6ba5a..00000000000 --- a/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.coordinator.group - -import org.apache.kafka.common.record.CompressionType - -/** - * Configuration settings for in-built offset management - * @param maxMetadataSize The maximum allowed metadata for any offset commit. - * @param loadBufferSize Batch size for reading from the offsets segments when loading offsets into the cache. - * @param offsetsRetentionMs For subscribed consumers, committed offset of a specific partition will be expired and discarded when - * 1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); - * 2) this retention period has elapsed since the last time an offset is committed for the partition AND the group is no longer subscribed to the corresponding topic. - * For standalone consumers (using manual assignment), offsets will be expired after this retention period has elapsed since the time of last commit. - * Note that when a group is deleted via the delete-group request, its committed offsets will also be deleted immediately; - * Also when a topic is deleted via the delete-topic request, upon propagated metadata update any group's committed offsets for that topic will also be deleted without extra retention period - * @param offsetsRetentionCheckIntervalMs Frequency at which to check for expired offsets. - * @param offsetsTopicNumPartitions The number of partitions for the offset commit topic (should not change after deployment). - * @param offsetsTopicSegmentBytes The offsets topic segment bytes should be kept relatively small to facilitate faster - * log compaction and faster offset loads - * @param offsetsTopicReplicationFactor The replication factor for the offset commit topic (set higher to ensure availability). - * @param offsetsTopicCompressionCodec Compression codec for the offsets topic - compression should be turned on in - * order to achieve "atomic" commits. - * @param offsetCommitTimeoutMs The offset commit will be delayed until all replicas for the offsets topic receive the - * commit or this timeout is reached. (Similar to the producer request timeout.) - * @param offsetCommitRequiredAcks The required acks before the commit can be accepted. In general, the default (-1) - * should not be overridden. - */ -case class OffsetConfig(maxMetadataSize: Int = OffsetConfig.DefaultMaxMetadataSize, - loadBufferSize: Int = OffsetConfig.DefaultLoadBufferSize, - offsetsRetentionMs: Long = OffsetConfig.DefaultOffsetRetentionMs, - offsetsRetentionCheckIntervalMs: Long = OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs, - offsetsTopicNumPartitions: Int = OffsetConfig.DefaultOffsetsTopicNumPartitions, - offsetsTopicSegmentBytes: Int = OffsetConfig.DefaultOffsetsTopicSegmentBytes, - offsetsTopicReplicationFactor: Short = OffsetConfig.DefaultOffsetsTopicReplicationFactor, - offsetsTopicCompressionType: CompressionType = OffsetConfig.DefaultOffsetsTopicCompressionType, - offsetCommitTimeoutMs: Int = OffsetConfig.DefaultOffsetCommitTimeoutMs, - offsetCommitRequiredAcks: Short = OffsetConfig.DefaultOffsetCommitRequiredAcks) - -object OffsetConfig { - val DefaultMaxMetadataSize = 4096 - val DefaultLoadBufferSize = 5*1024*1024 - val DefaultOffsetRetentionMs = 24*60*60*1000L - val DefaultOffsetsRetentionCheckIntervalMs = 600000L - val DefaultOffsetsTopicNumPartitions = 50 - val DefaultOffsetsTopicSegmentBytes = 100*1024*1024 - val DefaultOffsetsTopicReplicationFactor = 3.toShort - val DefaultOffsetsTopicCompressionType = CompressionType.NONE - val DefaultOffsetCommitTimeoutMs = 5000 - val DefaultOffsetCommitRequiredAcks = (-1).toShort -} diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 2afb1d64387..210722b652a 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -21,7 +21,6 @@ import java.{lang, util} import java.util.concurrent.TimeUnit import java.util.{Collections, Locale, Properties} import kafka.cluster.EndPoint -import kafka.coordinator.group.OffsetConfig import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager} import kafka.security.authorizer.AuthorizerUtils import kafka.server.KafkaConfig.{ControllerListenerNamesProp, ListenerSecurityProtocolMapProp} @@ -42,6 +41,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.group.Group.GroupType +import org.apache.kafka.coordinator.group.OffsetConfig import org.apache.kafka.coordinator.group.assignor.{PartitionAssignor, RangeAssignor, UniformAssignor} import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.ProcessRole @@ -181,16 +181,16 @@ object Defaults { val ConsumerGroupAssignors = List(classOf[UniformAssignor].getName, classOf[RangeAssignor].getName).asJava /** ********* Offset management configuration ***********/ - val OffsetMetadataMaxSize = OffsetConfig.DefaultMaxMetadataSize - val OffsetsLoadBufferSize = OffsetConfig.DefaultLoadBufferSize - val OffsetsTopicReplicationFactor = OffsetConfig.DefaultOffsetsTopicReplicationFactor - val OffsetsTopicPartitions: Int = OffsetConfig.DefaultOffsetsTopicNumPartitions - val OffsetsTopicSegmentBytes: Int = OffsetConfig.DefaultOffsetsTopicSegmentBytes - val OffsetsTopicCompressionCodec: Int = OffsetConfig.DefaultOffsetsTopicCompressionType.id + val OffsetMetadataMaxSize = OffsetConfig.DEFAULT_MAX_METADATA_SIZE + val OffsetsLoadBufferSize = OffsetConfig.DEFAULT_LOAD_BUFFER_SIZE + val OffsetsTopicReplicationFactor = OffsetConfig.DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR + val OffsetsTopicPartitions: Int = OffsetConfig.DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS + val OffsetsTopicSegmentBytes: Int = OffsetConfig.DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES + val OffsetsTopicCompressionCodec: Int = OffsetConfig.DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE.id val OffsetsRetentionMinutes: Int = 7 * 24 * 60 - val OffsetsRetentionCheckIntervalMs: Long = OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs - val OffsetCommitTimeoutMs = OffsetConfig.DefaultOffsetCommitTimeoutMs - val OffsetCommitRequiredAcks = OffsetConfig.DefaultOffsetCommitRequiredAcks + val OffsetsRetentionCheckIntervalMs: Long = OffsetConfig.DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS + val OffsetCommitTimeoutMs = OffsetConfig.DEFAULT_OFFSET_COMMIT_TIMEOUT_MS + val OffsetCommitRequiredAcks = OffsetConfig.DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS /** ********* Transaction management configuration ***********/ val TransactionalIdExpirationMs = TransactionStateManager.DefaultTransactionalIdExpirationMs diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala index 58c3d85f698..4d7ddc15cb3 100644 --- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala @@ -20,9 +20,7 @@ package kafka.server import java.util.{Collections, Objects, Properties} import java.util.concurrent.TimeUnit - import kafka.api.SaslSetup -import kafka.coordinator.group.OffsetConfig import kafka.utils.JaasTestUtils.JaasSection import kafka.utils.{JaasTestUtils, TestUtils} import kafka.utils.Implicits._ @@ -31,6 +29,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.config.SslConfigs import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.network.{ListenerName, Mode} +import org.apache.kafka.coordinator.group.OffsetConfig import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} @@ -108,7 +107,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT s"Unexpected ${KafkaConfig.InterBrokerListenerNameProp} for broker ${config.brokerId}") } - TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions, + TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS, replicationFactor = 2, servers, servers.head.groupCoordinator.groupMetadataTopicConfigs) createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 2b6e226ec64..4ffb5e6ab25 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -36,6 +36,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity +import org.apache.kafka.coordinator.group.OffsetConfig import org.apache.kafka.server.util.timer.MockTimer import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.storage.internals.log.VerificationGuard @@ -3816,7 +3817,7 @@ class GroupCoordinatorTest { val producerEpoch: Short = 3 val offsets = Map( - tip -> OffsetAndMetadata(offset, "s" * (OffsetConfig.DefaultMaxMetadataSize + 1), 0) + tip -> OffsetAndMetadata(offset, "s" * (OffsetConfig.DEFAULT_MAX_METADATA_SIZE + 1), 0) ) val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, offsets) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index d43a1225598..b2c2ece0a57 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -42,6 +42,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.OffsetFetchResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Utils +import org.apache.kafka.coordinator.group.OffsetConfig import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitValue} import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion._ @@ -79,16 +80,16 @@ class GroupMetadataManagerTest { private val offsetConfig = { val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")) - OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize, - loadBufferSize = config.offsetsLoadBufferSize, - offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, - offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, - offsetsTopicNumPartitions = config.offsetsTopicPartitions, - offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes, - offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, - offsetsTopicCompressionType = config.offsetsTopicCompressionType, - offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, - offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) + new OffsetConfig(config.offsetMetadataMaxSize, + config.offsetsLoadBufferSize, + config.offsetsRetentionMinutes * 60 * 1000L, + config.offsetsRetentionCheckIntervalMs, + config.offsetsTopicPartitions, + config.offsetsTopicSegmentBytes, + config.offsetsTopicReplicationFactor, + config.offsetsTopicCompressionType, + config.offsetCommitTimeoutMs, + config.offsetCommitRequiredAcks) } @BeforeEach @@ -775,7 +776,7 @@ class GroupMetadataManagerTest { ) // create a GroupMetadata record larger then offsets.load.buffer.size (here at least 16 bytes larger) - val assignmentSize = OffsetConfig.DefaultLoadBufferSize + 16 + val assignmentSize = OffsetConfig.DEFAULT_LOAD_BUFFER_SIZE + 16 val memberId = "98098230493" val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java new file mode 100644 index 00000000000..e15ce64cd82 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.record.CompressionType; + +public class OffsetConfig { + public static final int DEFAULT_MAX_METADATA_SIZE = 4096; + public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; + public static final long DEFAULT_OFFSET_RETENTION_MS = 24 * 60 * 60 * 1000L; + public static final long DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS = 600000L; + public static final int DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS = 50; + public static final int DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES = 100 * 1024 * 1024; + public static final short DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR = 3; + public static final CompressionType DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE = CompressionType.NONE; + public static final int DEFAULT_OFFSET_COMMIT_TIMEOUT_MS = 5000; + public static final short DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS = -1; + + public final int maxMetadataSize; + public final int loadBufferSize; + public final long offsetsRetentionMs; + public final long offsetsRetentionCheckIntervalMs; + public final int offsetsTopicNumPartitions; + public final int offsetsTopicSegmentBytes; + public final short offsetsTopicReplicationFactor; + public final CompressionType offsetsTopicCompressionType; + public final int offsetCommitTimeoutMs; + public final short offsetCommitRequiredAcks; + + /** + * Configuration settings for in-built offset management + * @param maxMetadataSize The maximum allowed metadata for any offset commit. + * @param loadBufferSize Batch size for reading from the offsets segments when loading offsets into the cache. + * @param offsetsRetentionMs For subscribed consumers, committed offset of a specific partition will be expired and discarded when + * 1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); + * 2) this retention period has elapsed since the last time an offset is committed for the partition AND the group is no longer subscribed to the corresponding topic. + * For standalone consumers (using manual assignment), offsets will be expired after this retention period has elapsed since the time of last commit. + * Note that when a group is deleted via the delete-group request, its committed offsets will also be deleted immediately; + * Also when a topic is deleted via the delete-topic request, upon propagated metadata update any group's committed offsets for that topic will also be deleted without extra retention period + * @param offsetsRetentionCheckIntervalMs Frequency at which to check for expired offsets. + * @param offsetsTopicNumPartitions The number of partitions for the offset commit topic (should not change after deployment). + * @param offsetsTopicSegmentBytes The offsets topic segment bytes should be kept relatively small to facilitate faster + * log compaction and faster offset loads + * @param offsetsTopicReplicationFactor The replication factor for the offset commit topic (set higher to ensure availability). + * @param offsetsTopicCompressionType Compression type for the offsets topic - compression should be turned on in + * order to achieve "atomic" commits. + * @param offsetCommitTimeoutMs The offset commit will be delayed until all replicas for the offsets topic receive the + * commit or this timeout is reached. (Similar to the producer request timeout.) + * @param offsetCommitRequiredAcks The required acks before the commit can be accepted. In general, the default (-1) + * should not be overridden. + */ + public OffsetConfig(int maxMetadataSize, + int loadBufferSize, + long offsetsRetentionMs, + long offsetsRetentionCheckIntervalMs, + int offsetsTopicNumPartitions, + int offsetsTopicSegmentBytes, + short offsetsTopicReplicationFactor, + CompressionType offsetsTopicCompressionType, + int offsetCommitTimeoutMs, + short offsetCommitRequiredAcks + ) { + this.maxMetadataSize = maxMetadataSize; + this.loadBufferSize = loadBufferSize; + this.offsetsRetentionMs = offsetsRetentionMs; + this.offsetsRetentionCheckIntervalMs = offsetsRetentionCheckIntervalMs; + this.offsetsTopicNumPartitions = offsetsTopicNumPartitions; + this.offsetsTopicSegmentBytes = offsetsTopicSegmentBytes; + this.offsetsTopicReplicationFactor = offsetsTopicReplicationFactor; + this.offsetsTopicCompressionType = offsetsTopicCompressionType; + this.offsetCommitTimeoutMs = offsetCommitTimeoutMs; + this.offsetCommitRequiredAcks = offsetCommitRequiredAcks; + } + + public OffsetConfig() { + this(DEFAULT_MAX_METADATA_SIZE, DEFAULT_LOAD_BUFFER_SIZE, DEFAULT_OFFSET_RETENTION_MS, + DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS, DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS, + DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES, DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR, + DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE, DEFAULT_OFFSET_COMMIT_TIMEOUT_MS, + DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS); + } +} \ No newline at end of file