[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-02-10 Thread via GitHub


ijuma commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1102976553


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -0,0 +1,676 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Crc32C;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Maintains a mapping from ProducerIds to metadata about the last appended 
entries (e.g.
+ * epoch, sequence number, last offset, etc.)
+ * 
+ * The sequence number is the last number successfully appended to the 
partition for the given identifier.
+ * The epoch is used for fencing against zombie writers. The offset is the one 
of the last successful message
+ * appended to the partition.
+ * 
+ * As long as a producer id is contained in the map, the corresponding 
producer can continue to write data.
+ * However, producer ids can be expired due to lack of recent use or if the 
last written entry has been deleted from
+ * the log (e.g. if the retention policy is "delete"). For compacted topics, 
the log cleaner will ensure
+ * that the most recent entry from a given producer id is retained in the log 
provided it hasn't expired due to
+ * age. This ensures that producer ids will not be expired until either the 
max expiration time has been reached,
+ * or if the topic also is configured for deletion, the segment containing the 
last written offset has
+ * been deleted.
+ */
+public class ProducerStateManager {
+
+public static final long LATE_TRANSACTION_BUFFER_MS = 5 * 60 * 1000;
+
+private static final short PRODUCER_SNAPSHOT_VERSION = 1;
+private static final String VERSION_FIELD = "version";
+private static final String CRC_FIELD = "crc";
+private static final String PRODUCER_ID_FIELD = "producer_id";
+private static final String LAST_SEQUENCE_FIELD = "last_sequence";
+private static final String PRODUCER_EPOCH_FIELD = "epoch";
+private static final String LAST_OFFSET_FIELD = "last_offset";
+private static final String OFFSET_DELTA_FIELD = "offset_delta";
+private static final String TIMESTAMP_FIELD = "timestamp";
+private static final String PRODUCER_ENTRIES_FIELD = "producer_entries";
+private static final String COORDINATOR_EPOCH_FIELD = "coordinator_epoch";
+private static final String CURRENT_TXN_FIRST_OFFSET_FIELD = 
"current_txn_first_offset";
+
+private static final int VERSION_OFFSET = 0;
+private static final int CRC_OFFSET = VERSION_OFFSET + 2;
+private static final int PRODUCER_ENTRIES_OFFSET = CRC_OFFSET + 4;
+
+private static final Schema PRODUCER_SNAPSHOT_ENTRY_SCHEMA =
+new Schema(new Field(PRODUCER_ID_FIELD, Type.INT64, "The producer 
ID"),
+new Field(PRODUCER_EPOCH_FIELD, Type.INT16, "Current epoch 
of the producer"),
+new Field(LAST_SEQUENCE_FIELD, 

[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-02-10 Thread via GitHub


ijuma commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1102975376


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -680,19 +680,23 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock 
synchronized {

Review Comment:
   Makes sense about keeping the implementation in `UnifiedLog`. I still think 
we should simply return `collection.Map` and avoid the copy.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -680,19 +680,23 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock 
synchronized {

Review Comment:
   Makes sense about keeping the implementation in `UnifiedLog`. I still think 
we should simply return `collection.Map` and avoid the extra copy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-02-10 Thread via GitHub


ijuma commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1102882346


##
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##
@@ -685,7 +685,7 @@ class TransactionsTest extends IntegrationTestHarness {
   // get here without having bumped the epoch. If bumping the epoch is 
possible, the producer will attempt to, so
   // check there that the epoch has actually increased
   producerStateEntry =
-brokers(partitionLeader).logManager.getLog(new 
TopicPartition(testTopic, 
0)).get.producerStateManager.activeProducers(producerId)
+brokers(partitionLeader).logManager.getLog(new 
TopicPartition(testTopic, 
0)).get.producerStateManager.activeProducers.get(producerId)

Review Comment:
   It's fine for now since the test will still fail if no value is returned for 
the key, but it will result in a NPE instead of a NoSuchElementException.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-02-10 Thread via GitHub


ijuma commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1102882346


##
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##
@@ -685,7 +685,7 @@ class TransactionsTest extends IntegrationTestHarness {
   // get here without having bumped the epoch. If bumping the epoch is 
possible, the producer will attempt to, so
   // check there that the epoch has actually increased
   producerStateEntry =
-brokers(partitionLeader).logManager.getLog(new 
TopicPartition(testTopic, 
0)).get.producerStateManager.activeProducers(producerId)
+brokers(partitionLeader).logManager.getLog(new 
TopicPartition(testTopic, 
0)).get.producerStateManager.activeProducers.get(producerId)

Review Comment:
   It's fine for now since the test will still fail if no value is returned for 
the key, but it will result in a NPE instead of a NoSuchKeyException.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-02-10 Thread via GitHub


ijuma commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1102842008


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -680,19 +680,23 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock 
synchronized {

Review Comment:
   Is it right that this is only used in tests? If so, can we move this to 
`LogTestUtils`? Then we don't have to worry about the `toMap` copy. 
Alternatively, just change the return type to `scala.collection.Map` and remove 
the unnecessary `toMap` at the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-02-09 Thread via GitHub


ijuma commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1101481342


##
core/src/test/scala/unit/kafka/log/LogTestUtils.scala:
##
@@ -247,7 +246,7 @@ object LogTestUtils {
   }
 
   def listProducerSnapshotOffsets(logDir: File): Seq[Long] =
-ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted
+
ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted.toSeq

Review Comment:
   `Buffer` is a `Seq`, but this is probably returning `immutable.Seq`. Since 
it's a test, we can leave as is (I hadn't noticed it was a test in the initial 
review.



##
core/src/test/scala/unit/kafka/log/LogTestUtils.scala:
##
@@ -247,7 +246,7 @@ object LogTestUtils {
   }
 
   def listProducerSnapshotOffsets(logDir: File): Seq[Long] =
-ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted
+
ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted.toSeq

Review Comment:
   `Buffer` is a `mutable.Seq`, but this is probably returning `immutable.Seq`. 
Since it's a test, we can leave as is (I hadn't noticed it was a test in the 
initial review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-02-09 Thread via GitHub


ijuma commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1101480397


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -680,20 +680,20 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock 
synchronized {
-producerStateManager.activeProducers.map { case (producerId, 
producerIdEntry) =>
-  (producerId, producerIdEntry.lastSeq)
+producerStateManager.activeProducers.asScala.map { case (producerId, 
producerIdEntry) =>
+  (producerId.toLong, producerIdEntry.lastSeq)
 }
-  }
+  }.toMap

Review Comment:
   I realized, but since we are creating a new map in this method, I don't see 
why an immutable Map is needed. Can we just change the result type to be 
`collection.Map`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-02-02 Thread via GitHub


ijuma commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1095387492


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManager.java:
##
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Crc32C;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Maintains a mapping from ProducerIds to metadata about the last appended 
entries (e.g.
+ * epoch, sequence number, last offset, etc.)
+ * 
+ * The sequence number is the last number successfully appended to the 
partition for the given identifier.
+ * The epoch is used for fencing against zombie writers. The offset is the one 
of the last successful message
+ * appended to the partition.
+ * 
+ * As long as a producer id is contained in the map, the corresponding 
producer can continue to write data.
+ * However, producer ids can be expired due to lack of recent use or if the 
last written entry has been deleted from
+ * the log (e.g. if the retention policy is "delete"). For compacted topics, 
the log cleaner will ensure
+ * that the most recent entry from a given producer id is retained in the log 
provided it hasn't expired due to
+ * age. This ensures that producer ids will not be expired until either the 
max expiration time has been reached,
+ * or if the topic also is configured for deletion, the segment containing the 
last written offset has
+ * been deleted.
+ */
+public class ProducerStateManager {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerStateManager.class.getName());
+
+// Remove these once UnifiedLog moves to storage module.
+public static final String DELETED_FILE_SUFFIX = ".deleted";
+public static final String PRODUCER_SNAPSHOT_FILE_SUFFIX = ".snapshot";
+
+public static final long LATE_TRANSACTION_BUFFER_MS = 5 * 60 * 1000;
+
+private static final short PRODUCER_SNAPSHOT_VERSION = 1;
+private static final String VERSION_FIELD = "version";
+private static final String CRC_FIELD = "crc";
+private static final String PRODUCER_ID_FIELD = "producer_id";
+private static final String LAST_SEQUENCE_FIELD = "last_sequence";
+private static final String PRODUCER_EPOCH_FIELD = "epoch";
+private static final String LAST_OFFSET_FIELD = "last_offset";
+private static final String OFFSET_DELTA_FIELD = "offset_delta";
+private static final String TIMESTAMP_FIELD = "timestamp";
+private static final String PRODUCER_ENTRIES_FIELD = "producer_entries";
+private static final String COORDINATOR_EPOCH_FIELD = "coordinator_epoch";
+private static final String CURRENT_TXN_FIRST_OFFSET_FIELD = 
"current_txn_first_offset";
+
+private static final int VERSION_OFFSET = 0;
+private static final int CRC_OFFSET = VERSION_OFFSET + 2;
+private static final int 

[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-01-17 Thread GitBox


ijuma commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1071365497


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2104,7 +2105,7 @@ object UnifiedLog extends Logging {
 // (or later snapshots). Otherwise, if there is no snapshot file, then we 
have to rebuild producer state
 // from the first segment.
 if (recordVersion.value < RecordBatch.MAGIC_VALUE_V2 ||
-  (producerStateManager.latestSnapshotOffset.isEmpty && 
reloadFromCleanShutdown)) {
+  (producerStateManager.latestSnapshotOffset.asScala.isEmpty && 
reloadFromCleanShutdown)) {
   // To avoid an expensive scan through all of the segments, we take empty 
snapshots from the start of the

Review Comment:
   Can we call !isPresent instead of asScala.isEmpty?



##
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##
@@ -557,7 +558,7 @@ class LogLoaderTest {
   _topicId = None,
   keepPartitionMetadataFile = true)
 
-verify(stateManager).removeStraySnapshots(any[Seq[Long]])
+
verify(stateManager).removeStraySnapshots((any[java.util.List[java.lang.Long]]))

Review Comment:
   Are the extra parenthesis around `any` needed? We have a few similar 
examples in this file.



##
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##
@@ -340,8 +342,7 @@ class ProducerStateManagerTest {
 // After reloading from the snapshot, the transaction should still be 
considered late
 val reloadedStateManager = new ProducerStateManager(partition, logDir, 
maxTransactionTimeoutMs,
   producerStateManagerConfig, time)
-reloadedStateManager.truncateAndReload(logStartOffset = 0L,
-  logEndOffset = stateManager.mapEndOffset, currentTimeMs = 
time.milliseconds())
+reloadedStateManager.truncateAndReload(0L,stateManager.mapEndOffset, 
time.milliseconds())

Review Comment:
   Nit: space missing after `,`.



##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManagerConfig.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class ProducerStateManagerConfig {
+public static final Set RECONFIGURABLE_CONFIGS = 
Collections.singleton("producer.id.expiration.ms");
+private volatile int producerIdExpirationMs;
+
+public ProducerStateManagerConfig(int producerIdExpirationMs) {
+this.producerIdExpirationMs = producerIdExpirationMs;
+}
+
+public void updateProducerIdExpirationMs(int producerIdExpirationMs) {

Review Comment:
   Nit: use `set` instead of `update`?



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -680,20 +680,20 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock 
synchronized {
-producerStateManager.activeProducers.map { case (producerId, 
producerIdEntry) =>
-  (producerId, producerIdEntry.lastSeq)
+producerStateManager.activeProducers.asScala.map { case (producerId, 
producerIdEntry) =>
+  (producerId.toLong, producerIdEntry.lastSeq)
 }
-  }
+  }.toMap

Review Comment:
   Can we avoid this `toMap` copy? Same for the case a few lines below.



##
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##
@@ -652,7 +652,7 @@ class TransactionsTest extends IntegrationTestHarness {
   producer.commitTransaction()
 
   var producerStateEntry =
-brokers(partitionLeader).logManager.getLog(new 
TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.head._2
+brokers(partitionLeader).logManager.getLog(new 
TopicPartition(testTopic, 
0)).get.producerStateManager.activeProducers.asScala.head._2

Review Comment:
   Can we use `.get(0)` instead of `asScala.head`? There's one other similar 
example in this file.



##
core/src/test/scala/unit/kafka/log/LogTestUtils.scala:
##
@@ -247,7 +246,7 @@ object LogTestUtils {
   }
 
   def listProducerSnapshotOffsets(logDir: File): Seq[Long] =
-

[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-01-16 Thread GitBox


ijuma commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1071361752


##
core/src/main/scala/kafka/log/LogLoader.scala:
##
@@ -191,7 +192,7 @@ class LogLoader(
 // Reload all snapshots into the ProducerStateManager cache, the 
intermediate ProducerStateManager used
 // during log recovery may have deleted some files without the 
LogLoader.producerStateManager instance witnessing the
 // deletion.
-producerStateManager.removeStraySnapshots(segments.baseOffsets.toSeq)
+producerStateManager.removeStraySnapshots(segments.baseOffsets.toSeq.map(x 
=> Long.box(x)).asJava)

Review Comment:
   Seems like we do two collection copies here. Maybe we can tweak things so 
that only one of them is needed (and when we convert the segments code, none 
are needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org