This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dba789dc93d KAFKA-15853: Move OffsetConfig to group-coordinator module 
(#15161)
dba789dc93d is described below

commit dba789dc93d8dee2965ea41df49f0c981f5dfa6f
Author: Omnia Ibrahim <o.g.h.ibra...@gmail.com>
AuthorDate: Thu Jan 11 09:19:42 2024 +0000

    KAFKA-15853: Move OffsetConfig to group-coordinator module (#15161)
    
    
    Reviewers: Mickael Maison <mickael.mai...@gmail.com>, David Jacot 
<dja...@confluent.io>, Nikolay <nizhi...@apache.org>
---
 .../kafka/coordinator/group/GroupCoordinator.scala | 23 ++---
 .../coordinator/group/GroupMetadataManager.scala   |  1 +
 .../kafka/coordinator/group/OffsetConfig.scala     | 66 ---------------
 core/src/main/scala/kafka/server/KafkaConfig.scala | 20 ++---
 ...ListenersWithSameSecurityProtocolBaseTest.scala |  5 +-
 .../coordinator/group/GroupCoordinatorTest.scala   |  3 +-
 .../group/GroupMetadataManagerTest.scala           | 23 ++---
 .../kafka/coordinator/group/OffsetConfig.java      | 97 ++++++++++++++++++++++
 8 files changed, 136 insertions(+), 102 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index aba205372b6..cff348228d6 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.coordinator.group.OffsetConfig
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.storage.internals.log.VerificationGuard
 
@@ -1751,17 +1752,17 @@ object GroupCoordinator {
     GroupCoordinator(config, replicaManager, heartbeatPurgatory, 
rebalancePurgatory, time, metrics)
   }
 
-  private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
-    maxMetadataSize = config.offsetMetadataMaxSize,
-    loadBufferSize = config.offsetsLoadBufferSize,
-    offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,
-    offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
-    offsetsTopicNumPartitions = config.offsetsTopicPartitions,
-    offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
-    offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
-    offsetsTopicCompressionType = config.offsetsTopicCompressionType,
-    offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
-    offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
+  private[group] def offsetConfig(config: KafkaConfig) = new OffsetConfig(
+    config.offsetMetadataMaxSize,
+    config.offsetsLoadBufferSize,
+    config.offsetsRetentionMinutes * 60L * 1000L,
+    config.offsetsRetentionCheckIntervalMs,
+    config.offsetsTopicPartitions,
+    config.offsetsTopicSegmentBytes,
+    config.offsetsTopicReplicationFactor,
+    config.offsetsTopicCompressionType,
+    config.offsetCommitTimeoutMs,
+    config.offsetCommitRequiredAcks
   )
 
   private[group] def apply(
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 31af1d81a22..3177576eed1 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -43,6 +43,7 @@ import 
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.{OffsetCommitRequest, 
OffsetFetchResponse}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaException, MessageFormatter, 
TopicIdPartition, TopicPartition}
+import org.apache.kafka.coordinator.group.OffsetConfig
 import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, 
OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, 
IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0}
diff --git a/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala 
b/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala
deleted file mode 100644
index 189e8e6ba5a..00000000000
--- a/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator.group
-
-import org.apache.kafka.common.record.CompressionType
-
-/**
- * Configuration settings for in-built offset management
- * @param maxMetadataSize The maximum allowed metadata for any offset commit.
- * @param loadBufferSize Batch size for reading from the offsets segments when 
loading offsets into the cache.
- * @param offsetsRetentionMs For subscribed consumers, committed offset of a 
specific partition will be expired and discarded when
- *                             1) this retention period has elapsed after the 
consumer group loses all its consumers (i.e. becomes empty);
- *                             2) this retention period has elapsed since the 
last time an offset is committed for the partition AND the group is no longer 
subscribed to the corresponding topic.
- *                           For standalone consumers (using manual 
assignment), offsets will be expired after this retention period has elapsed 
since the time of last commit.
- *                           Note that when a group is deleted via the 
delete-group request, its committed offsets will also be deleted immediately;
- *                           Also when a topic is deleted via the delete-topic 
request, upon propagated metadata update any group's committed offsets for that 
topic will also be deleted without extra retention period
- * @param offsetsRetentionCheckIntervalMs Frequency at which to check for 
expired offsets.
- * @param offsetsTopicNumPartitions The number of partitions for the offset 
commit topic (should not change after deployment).
- * @param offsetsTopicSegmentBytes The offsets topic segment bytes should be 
kept relatively small to facilitate faster
- *                                 log compaction and faster offset loads
- * @param offsetsTopicReplicationFactor The replication factor for the offset 
commit topic (set higher to ensure availability).
- * @param offsetsTopicCompressionCodec Compression codec for the offsets topic 
- compression should be turned on in
- *                                     order to achieve "atomic" commits.
- * @param offsetCommitTimeoutMs The offset commit will be delayed until all 
replicas for the offsets topic receive the
- *                              commit or this timeout is reached. (Similar to 
the producer request timeout.)
- * @param offsetCommitRequiredAcks The required acks before the commit can be 
accepted. In general, the default (-1)
- *                                 should not be overridden.
- */
-case class OffsetConfig(maxMetadataSize: Int = 
OffsetConfig.DefaultMaxMetadataSize,
-                        loadBufferSize: Int = 
OffsetConfig.DefaultLoadBufferSize,
-                        offsetsRetentionMs: Long = 
OffsetConfig.DefaultOffsetRetentionMs,
-                        offsetsRetentionCheckIntervalMs: Long = 
OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs,
-                        offsetsTopicNumPartitions: Int = 
OffsetConfig.DefaultOffsetsTopicNumPartitions,
-                        offsetsTopicSegmentBytes: Int = 
OffsetConfig.DefaultOffsetsTopicSegmentBytes,
-                        offsetsTopicReplicationFactor: Short = 
OffsetConfig.DefaultOffsetsTopicReplicationFactor,
-                        offsetsTopicCompressionType: CompressionType = 
OffsetConfig.DefaultOffsetsTopicCompressionType,
-                        offsetCommitTimeoutMs: Int = 
OffsetConfig.DefaultOffsetCommitTimeoutMs,
-                        offsetCommitRequiredAcks: Short = 
OffsetConfig.DefaultOffsetCommitRequiredAcks)
-
-object OffsetConfig {
-  val DefaultMaxMetadataSize = 4096
-  val DefaultLoadBufferSize = 5*1024*1024
-  val DefaultOffsetRetentionMs = 24*60*60*1000L
-  val DefaultOffsetsRetentionCheckIntervalMs = 600000L
-  val DefaultOffsetsTopicNumPartitions = 50
-  val DefaultOffsetsTopicSegmentBytes = 100*1024*1024
-  val DefaultOffsetsTopicReplicationFactor = 3.toShort
-  val DefaultOffsetsTopicCompressionType = CompressionType.NONE
-  val DefaultOffsetCommitTimeoutMs = 5000
-  val DefaultOffsetCommitRequiredAcks = (-1).toShort
-}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2afb1d64387..210722b652a 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -21,7 +21,6 @@ import java.{lang, util}
 import java.util.concurrent.TimeUnit
 import java.util.{Collections, Locale, Properties}
 import kafka.cluster.EndPoint
-import kafka.coordinator.group.OffsetConfig
 import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
 import kafka.security.authorizer.AuthorizerUtils
 import kafka.server.KafkaConfig.{ControllerListenerNamesProp, 
ListenerSecurityProtocolMapProp}
@@ -42,6 +41,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.coordinator.group.Group.GroupType
+import org.apache.kafka.coordinator.group.OffsetConfig
 import org.apache.kafka.coordinator.group.assignor.{PartitionAssignor, 
RangeAssignor, UniformAssignor}
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.ProcessRole
@@ -181,16 +181,16 @@ object Defaults {
   val ConsumerGroupAssignors = List(classOf[UniformAssignor].getName, 
classOf[RangeAssignor].getName).asJava
 
   /** ********* Offset management configuration ***********/
-  val OffsetMetadataMaxSize = OffsetConfig.DefaultMaxMetadataSize
-  val OffsetsLoadBufferSize = OffsetConfig.DefaultLoadBufferSize
-  val OffsetsTopicReplicationFactor = 
OffsetConfig.DefaultOffsetsTopicReplicationFactor
-  val OffsetsTopicPartitions: Int = 
OffsetConfig.DefaultOffsetsTopicNumPartitions
-  val OffsetsTopicSegmentBytes: Int = 
OffsetConfig.DefaultOffsetsTopicSegmentBytes
-  val OffsetsTopicCompressionCodec: Int = 
OffsetConfig.DefaultOffsetsTopicCompressionType.id
+  val OffsetMetadataMaxSize = OffsetConfig.DEFAULT_MAX_METADATA_SIZE
+  val OffsetsLoadBufferSize = OffsetConfig.DEFAULT_LOAD_BUFFER_SIZE
+  val OffsetsTopicReplicationFactor = 
OffsetConfig.DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR
+  val OffsetsTopicPartitions: Int = 
OffsetConfig.DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS
+  val OffsetsTopicSegmentBytes: Int = 
OffsetConfig.DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES
+  val OffsetsTopicCompressionCodec: Int = 
OffsetConfig.DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE.id
   val OffsetsRetentionMinutes: Int = 7 * 24 * 60
-  val OffsetsRetentionCheckIntervalMs: Long = 
OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs
-  val OffsetCommitTimeoutMs = OffsetConfig.DefaultOffsetCommitTimeoutMs
-  val OffsetCommitRequiredAcks = OffsetConfig.DefaultOffsetCommitRequiredAcks
+  val OffsetsRetentionCheckIntervalMs: Long = 
OffsetConfig.DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS
+  val OffsetCommitTimeoutMs = OffsetConfig.DEFAULT_OFFSET_COMMIT_TIMEOUT_MS
+  val OffsetCommitRequiredAcks = 
OffsetConfig.DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS
 
   /** ********* Transaction management configuration ***********/
   val TransactionalIdExpirationMs = 
TransactionStateManager.DefaultTransactionalIdExpirationMs
diff --git 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 58c3d85f698..4d7ddc15cb3 100644
--- 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -20,9 +20,7 @@ package kafka.server
 
 import java.util.{Collections, Objects, Properties}
 import java.util.concurrent.TimeUnit
-
 import kafka.api.SaslSetup
-import kafka.coordinator.group.OffsetConfig
 import kafka.utils.JaasTestUtils.JaasSection
 import kafka.utils.{JaasTestUtils, TestUtils}
 import kafka.utils.Implicits._
@@ -31,6 +29,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, 
ProducerRecord}
 import org.apache.kafka.common.config.SslConfigs
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.coordinator.group.OffsetConfig
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
@@ -108,7 +107,7 @@ abstract class 
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
         s"Unexpected ${KafkaConfig.InterBrokerListenerNameProp} for broker 
${config.brokerId}")
     }
 
-    TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, 
OffsetConfig.DefaultOffsetsTopicNumPartitions,
+    TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, 
OffsetConfig.DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS,
       replicationFactor = 2, servers, 
servers.head.groupCoordinator.groupMetadataTopicConfigs)
 
     createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, 
JaasTestUtils.KafkaScramPassword)
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 2b6e226ec64..4ffb5e6ab25 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -36,6 +36,7 @@ import 
org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
+import org.apache.kafka.coordinator.group.OffsetConfig
 import org.apache.kafka.server.util.timer.MockTimer
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
 import org.apache.kafka.storage.internals.log.VerificationGuard
@@ -3816,7 +3817,7 @@ class GroupCoordinatorTest {
     val producerEpoch: Short = 3
 
     val offsets = Map(
-      tip -> OffsetAndMetadata(offset, "s" * 
(OffsetConfig.DefaultMaxMetadataSize + 1), 0)
+      tip -> OffsetAndMetadata(offset, "s" * 
(OffsetConfig.DEFAULT_MAX_METADATA_SIZE + 1), 0)
     )
 
     val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, offsets)
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index d43a1225598..b2c2ece0a57 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -42,6 +42,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.OffsetFetchResponse
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.coordinator.group.OffsetConfig
 import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, 
OffsetCommitValue}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion._
@@ -79,16 +80,16 @@ class GroupMetadataManagerTest {
 
   private val offsetConfig = {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 
0, zkConnect = ""))
-    OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
-      loadBufferSize = config.offsetsLoadBufferSize,
-      offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
-      offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
-      offsetsTopicNumPartitions = config.offsetsTopicPartitions,
-      offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
-      offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
-      offsetsTopicCompressionType = config.offsetsTopicCompressionType,
-      offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
-      offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
+    new OffsetConfig(config.offsetMetadataMaxSize,
+      config.offsetsLoadBufferSize,
+      config.offsetsRetentionMinutes * 60 * 1000L,
+      config.offsetsRetentionCheckIntervalMs,
+      config.offsetsTopicPartitions,
+      config.offsetsTopicSegmentBytes,
+      config.offsetsTopicReplicationFactor,
+      config.offsetsTopicCompressionType,
+      config.offsetCommitTimeoutMs,
+      config.offsetCommitRequiredAcks)
   }
 
   @BeforeEach
@@ -775,7 +776,7 @@ class GroupMetadataManagerTest {
     )
 
     // create a GroupMetadata record larger then offsets.load.buffer.size 
(here at least 16 bytes larger)
-    val assignmentSize = OffsetConfig.DefaultLoadBufferSize + 16
+    val assignmentSize = OffsetConfig.DEFAULT_LOAD_BUFFER_SIZE + 16
     val memberId = "98098230493"
 
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java
new file mode 100644
index 00000000000..e15ce64cd82
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.record.CompressionType;
+
+public class OffsetConfig {
+    public static final int DEFAULT_MAX_METADATA_SIZE = 4096;
+    public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024;
+    public static final long DEFAULT_OFFSET_RETENTION_MS = 24 * 60 * 60 * 
1000L;
+    public static final long DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS = 
600000L;
+    public static final int DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS = 50;
+    public static final int DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES = 100 * 1024 * 
1024;
+    public static final short DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR = 3;
+    public static final CompressionType DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE 
= CompressionType.NONE;
+    public static final int DEFAULT_OFFSET_COMMIT_TIMEOUT_MS = 5000;
+    public static final short DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS = -1;
+
+    public final int maxMetadataSize;
+    public final int loadBufferSize;
+    public final long offsetsRetentionMs;
+    public final long offsetsRetentionCheckIntervalMs;
+    public final int offsetsTopicNumPartitions;
+    public final int offsetsTopicSegmentBytes;
+    public final short offsetsTopicReplicationFactor;
+    public final CompressionType offsetsTopicCompressionType;
+    public final int offsetCommitTimeoutMs;
+    public final short offsetCommitRequiredAcks;
+
+    /**
+     * Configuration settings for in-built offset management
+     * @param maxMetadataSize The maximum allowed metadata for any offset 
commit.
+     * @param loadBufferSize Batch size for reading from the offsets segments 
when loading offsets into the cache.
+     * @param offsetsRetentionMs For subscribed consumers, committed offset of 
a specific partition will be expired and discarded when
+     *                             1) this retention period has elapsed after 
the consumer group loses all its consumers (i.e. becomes empty);
+     *                             2) this retention period has elapsed since 
the last time an offset is committed for the partition AND the group is no 
longer subscribed to the corresponding topic.
+     *                           For standalone consumers (using manual 
assignment), offsets will be expired after this retention period has elapsed 
since the time of last commit.
+     *                           Note that when a group is deleted via the 
delete-group request, its committed offsets will also be deleted immediately;
+     *                           Also when a topic is deleted via the 
delete-topic request, upon propagated metadata update any group's committed 
offsets for that topic will also be deleted without extra retention period
+     * @param offsetsRetentionCheckIntervalMs Frequency at which to check for 
expired offsets.
+     * @param offsetsTopicNumPartitions The number of partitions for the 
offset commit topic (should not change after deployment).
+     * @param offsetsTopicSegmentBytes The offsets topic segment bytes should 
be kept relatively small to facilitate faster
+     *                                 log compaction and faster offset loads
+     * @param offsetsTopicReplicationFactor The replication factor for the 
offset commit topic (set higher to ensure availability).
+     * @param offsetsTopicCompressionType Compression type for the offsets 
topic - compression should be turned on in
+     *                                     order to achieve "atomic" commits.
+     * @param offsetCommitTimeoutMs The offset commit will be delayed until 
all replicas for the offsets topic receive the
+     *                              commit or this timeout is reached. 
(Similar to the producer request timeout.)
+     * @param offsetCommitRequiredAcks The required acks before the commit can 
be accepted. In general, the default (-1)
+     *                                 should not be overridden.
+     */
+    public OffsetConfig(int maxMetadataSize,
+                        int loadBufferSize,
+                        long offsetsRetentionMs,
+                        long offsetsRetentionCheckIntervalMs,
+                        int offsetsTopicNumPartitions,
+                        int offsetsTopicSegmentBytes,
+                        short offsetsTopicReplicationFactor,
+                        CompressionType offsetsTopicCompressionType,
+                        int offsetCommitTimeoutMs,
+                        short offsetCommitRequiredAcks
+    ) {
+        this.maxMetadataSize = maxMetadataSize;
+        this.loadBufferSize = loadBufferSize;
+        this.offsetsRetentionMs = offsetsRetentionMs;
+        this.offsetsRetentionCheckIntervalMs = offsetsRetentionCheckIntervalMs;
+        this.offsetsTopicNumPartitions = offsetsTopicNumPartitions;
+        this.offsetsTopicSegmentBytes = offsetsTopicSegmentBytes;
+        this.offsetsTopicReplicationFactor = offsetsTopicReplicationFactor;
+        this.offsetsTopicCompressionType = offsetsTopicCompressionType;
+        this.offsetCommitTimeoutMs = offsetCommitTimeoutMs;
+        this.offsetCommitRequiredAcks = offsetCommitRequiredAcks;
+    }
+
+    public OffsetConfig() {
+        this(DEFAULT_MAX_METADATA_SIZE, DEFAULT_LOAD_BUFFER_SIZE, 
DEFAULT_OFFSET_RETENTION_MS,
+                DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS, 
DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS,
+                DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES, 
DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR,
+                DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE, 
DEFAULT_OFFSET_COMMIT_TIMEOUT_MS,
+                DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS);
+    }
+}
\ No newline at end of file

Reply via email to