[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
ijuma commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1102976553 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -0,0 +1,676 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Maintains a mapping from ProducerIds to metadata about the last appended entries (e.g. + * epoch, sequence number, last offset, etc.) + * + * The sequence number is the last number successfully appended to the partition for the given identifier. + * The epoch is used for fencing against zombie writers. The offset is the one of the last successful message + * appended to the partition. + * + * As long as a producer id is contained in the map, the corresponding producer can continue to write data. + * However, producer ids can be expired due to lack of recent use or if the last written entry has been deleted from + * the log (e.g. if the retention policy is "delete"). For compacted topics, the log cleaner will ensure + * that the most recent entry from a given producer id is retained in the log provided it hasn't expired due to + * age. This ensures that producer ids will not be expired until either the max expiration time has been reached, + * or if the topic also is configured for deletion, the segment containing the last written offset has + * been deleted. + */ +public class ProducerStateManager { + +public static final long LATE_TRANSACTION_BUFFER_MS = 5 * 60 * 1000; + +private static final short PRODUCER_SNAPSHOT_VERSION = 1; +private static final String VERSION_FIELD = "version"; +private static final String CRC_FIELD = "crc"; +private static final String PRODUCER_ID_FIELD = "producer_id"; +private static final String LAST_SEQUENCE_FIELD = "last_sequence"; +private static final String PRODUCER_EPOCH_FIELD = "epoch"; +private static final String LAST_OFFSET_FIELD = "last_offset"; +private static final String OFFSET_DELTA_FIELD = "offset_delta"; +private static final String TIMESTAMP_FIELD = "timestamp"; +private static final String PRODUCER_ENTRIES_FIELD = "producer_entries"; +private static final String COORDINATOR_EPOCH_FIELD = "coordinator_epoch"; +private static final String CURRENT_TXN_FIRST_OFFSET_FIELD = "current_txn_first_offset"; + +private static final int VERSION_OFFSET = 0; +private static final int CRC_OFFSET = VERSION_OFFSET + 2; +private static final int PRODUCER_ENTRIES_OFFSET = CRC_OFFSET + 4; + +private static final Schema PRODUCER_SNAPSHOT_ENTRY_SCHEMA = +new Schema(new Field(PRODUCER_ID_FIELD, Type.INT64, "The producer ID"), +new Field(PRODUCER_EPOCH_FIELD, Type.INT16, "Current epoch of the producer"), +new Field(LAST_SEQUENCE_FIELD,
[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
ijuma commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1102975376 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -680,19 +680,23 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock synchronized { Review Comment: Makes sense about keeping the implementation in `UnifiedLog`. I still think we should simply return `collection.Map` and avoid the copy. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -680,19 +680,23 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock synchronized { Review Comment: Makes sense about keeping the implementation in `UnifiedLog`. I still think we should simply return `collection.Map` and avoid the extra copy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
ijuma commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1102882346 ## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ## @@ -685,7 +685,7 @@ class TransactionsTest extends IntegrationTestHarness { // get here without having bumped the epoch. If bumping the epoch is possible, the producer will attempt to, so // check there that the epoch has actually increased producerStateEntry = -brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers(producerId) +brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.get(producerId) Review Comment: It's fine for now since the test will still fail if no value is returned for the key, but it will result in a NPE instead of a NoSuchElementException. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
ijuma commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1102882346 ## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ## @@ -685,7 +685,7 @@ class TransactionsTest extends IntegrationTestHarness { // get here without having bumped the epoch. If bumping the epoch is possible, the producer will attempt to, so // check there that the epoch has actually increased producerStateEntry = -brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers(producerId) +brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.get(producerId) Review Comment: It's fine for now since the test will still fail if no value is returned for the key, but it will result in a NPE instead of a NoSuchKeyException. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
ijuma commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1102842008 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -680,19 +680,23 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock synchronized { Review Comment: Is it right that this is only used in tests? If so, can we move this to `LogTestUtils`? Then we don't have to worry about the `toMap` copy. Alternatively, just change the return type to `scala.collection.Map` and remove the unnecessary `toMap` at the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
ijuma commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1101481342 ## core/src/test/scala/unit/kafka/log/LogTestUtils.scala: ## @@ -247,7 +246,7 @@ object LogTestUtils { } def listProducerSnapshotOffsets(logDir: File): Seq[Long] = -ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted + ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted.toSeq Review Comment: `Buffer` is a `Seq`, but this is probably returning `immutable.Seq`. Since it's a test, we can leave as is (I hadn't noticed it was a test in the initial review. ## core/src/test/scala/unit/kafka/log/LogTestUtils.scala: ## @@ -247,7 +246,7 @@ object LogTestUtils { } def listProducerSnapshotOffsets(logDir: File): Seq[Long] = -ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted + ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted.toSeq Review Comment: `Buffer` is a `mutable.Seq`, but this is probably returning `immutable.Seq`. Since it's a test, we can leave as is (I hadn't noticed it was a test in the initial review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
ijuma commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1101480397 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -680,20 +680,20 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock synchronized { -producerStateManager.activeProducers.map { case (producerId, producerIdEntry) => - (producerId, producerIdEntry.lastSeq) +producerStateManager.activeProducers.asScala.map { case (producerId, producerIdEntry) => + (producerId.toLong, producerIdEntry.lastSeq) } - } + }.toMap Review Comment: I realized, but since we are creating a new map in this method, I don't see why an immutable Map is needed. Can we just change the result type to be `collection.Map`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
ijuma commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1095387492 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManager.java: ## @@ -0,0 +1,671 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Maintains a mapping from ProducerIds to metadata about the last appended entries (e.g. + * epoch, sequence number, last offset, etc.) + * + * The sequence number is the last number successfully appended to the partition for the given identifier. + * The epoch is used for fencing against zombie writers. The offset is the one of the last successful message + * appended to the partition. + * + * As long as a producer id is contained in the map, the corresponding producer can continue to write data. + * However, producer ids can be expired due to lack of recent use or if the last written entry has been deleted from + * the log (e.g. if the retention policy is "delete"). For compacted topics, the log cleaner will ensure + * that the most recent entry from a given producer id is retained in the log provided it hasn't expired due to + * age. This ensures that producer ids will not be expired until either the max expiration time has been reached, + * or if the topic also is configured for deletion, the segment containing the last written offset has + * been deleted. + */ +public class ProducerStateManager { +private static final Logger log = LoggerFactory.getLogger(ProducerStateManager.class.getName()); + +// Remove these once UnifiedLog moves to storage module. +public static final String DELETED_FILE_SUFFIX = ".deleted"; +public static final String PRODUCER_SNAPSHOT_FILE_SUFFIX = ".snapshot"; + +public static final long LATE_TRANSACTION_BUFFER_MS = 5 * 60 * 1000; + +private static final short PRODUCER_SNAPSHOT_VERSION = 1; +private static final String VERSION_FIELD = "version"; +private static final String CRC_FIELD = "crc"; +private static final String PRODUCER_ID_FIELD = "producer_id"; +private static final String LAST_SEQUENCE_FIELD = "last_sequence"; +private static final String PRODUCER_EPOCH_FIELD = "epoch"; +private static final String LAST_OFFSET_FIELD = "last_offset"; +private static final String OFFSET_DELTA_FIELD = "offset_delta"; +private static final String TIMESTAMP_FIELD = "timestamp"; +private static final String PRODUCER_ENTRIES_FIELD = "producer_entries"; +private static final String COORDINATOR_EPOCH_FIELD = "coordinator_epoch"; +private static final String CURRENT_TXN_FIRST_OFFSET_FIELD = "current_txn_first_offset"; + +private static final int VERSION_OFFSET = 0; +private static final int CRC_OFFSET = VERSION_OFFSET + 2; +private static final int
[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
ijuma commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1071365497 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -2104,7 +2105,7 @@ object UnifiedLog extends Logging { // (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state // from the first segment. if (recordVersion.value < RecordBatch.MAGIC_VALUE_V2 || - (producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown)) { + (producerStateManager.latestSnapshotOffset.asScala.isEmpty && reloadFromCleanShutdown)) { // To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the Review Comment: Can we call !isPresent instead of asScala.isEmpty? ## core/src/test/scala/unit/kafka/log/LogLoaderTest.scala: ## @@ -557,7 +558,7 @@ class LogLoaderTest { _topicId = None, keepPartitionMetadataFile = true) -verify(stateManager).removeStraySnapshots(any[Seq[Long]]) + verify(stateManager).removeStraySnapshots((any[java.util.List[java.lang.Long]])) Review Comment: Are the extra parenthesis around `any` needed? We have a few similar examples in this file. ## core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala: ## @@ -340,8 +342,7 @@ class ProducerStateManagerTest { // After reloading from the snapshot, the transaction should still be considered late val reloadedStateManager = new ProducerStateManager(partition, logDir, maxTransactionTimeoutMs, producerStateManagerConfig, time) -reloadedStateManager.truncateAndReload(logStartOffset = 0L, - logEndOffset = stateManager.mapEndOffset, currentTimeMs = time.milliseconds()) +reloadedStateManager.truncateAndReload(0L,stateManager.mapEndOffset, time.milliseconds()) Review Comment: Nit: space missing after `,`. ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManagerConfig.java: ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Collections; +import java.util.Set; + +public class ProducerStateManagerConfig { +public static final Set RECONFIGURABLE_CONFIGS = Collections.singleton("producer.id.expiration.ms"); +private volatile int producerIdExpirationMs; + +public ProducerStateManagerConfig(int producerIdExpirationMs) { +this.producerIdExpirationMs = producerIdExpirationMs; +} + +public void updateProducerIdExpirationMs(int producerIdExpirationMs) { Review Comment: Nit: use `set` instead of `update`? ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -680,20 +680,20 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock synchronized { -producerStateManager.activeProducers.map { case (producerId, producerIdEntry) => - (producerId, producerIdEntry.lastSeq) +producerStateManager.activeProducers.asScala.map { case (producerId, producerIdEntry) => + (producerId.toLong, producerIdEntry.lastSeq) } - } + }.toMap Review Comment: Can we avoid this `toMap` copy? Same for the case a few lines below. ## core/src/test/scala/integration/kafka/api/TransactionsTest.scala: ## @@ -652,7 +652,7 @@ class TransactionsTest extends IntegrationTestHarness { producer.commitTransaction() var producerStateEntry = -brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.head._2 +brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.asScala.head._2 Review Comment: Can we use `.get(0)` instead of `asScala.head`? There's one other similar example in this file. ## core/src/test/scala/unit/kafka/log/LogTestUtils.scala: ## @@ -247,7 +246,7 @@ object LogTestUtils { } def listProducerSnapshotOffsets(logDir: File): Seq[Long] = -
[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
ijuma commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1071361752 ## core/src/main/scala/kafka/log/LogLoader.scala: ## @@ -191,7 +192,7 @@ class LogLoader( // Reload all snapshots into the ProducerStateManager cache, the intermediate ProducerStateManager used // during log recovery may have deleted some files without the LogLoader.producerStateManager instance witnessing the // deletion. -producerStateManager.removeStraySnapshots(segments.baseOffsets.toSeq) +producerStateManager.removeStraySnapshots(segments.baseOffsets.toSeq.map(x => Long.box(x)).asJava) Review Comment: Seems like we do two collection copies here. Maybe we can tweak things so that only one of them is needed (and when we convert the segments code, none are needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org