[GitHub] [kafka] mjsax commented on pull request #7414: MINOR: Fixed a division by 0 scenario

2023-02-03 Thread via GitHub


mjsax commented on PR #7414:
URL: https://github.com/apache/kafka/pull/7414#issuecomment-1416593923

   Addressed via https://github.com/apache/kafka/pull/13175 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

2023-02-03 Thread via GitHub


mjsax merged PR #13143:
URL: https://github.com/apache/kafka/pull/13143


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13021: KAFKA-14468: Implement CommitRequestManager to manage the commit and autocommit requests

2023-02-03 Thread via GitHub


guozhangwang commented on code in PR #13021:
URL: https://github.com/apache/kafka/pull/13021#discussion_r1096346374


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/GroupStateManager.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+
+import java.util.Objects;
+import java.util.Optional;
+
+public class GroupStateManager {

Review Comment:
   Maybe just call it `GroupState` since it does not do any management, but 
more of a grouping of multiple primitive states?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -73,7 +76,9 @@ public class DefaultBackgroundThread extends KafkaThread {
 final ApplicationEventProcessor processor,
 final ConsumerMetadata metadata,
 final NetworkClientDelegate networkClient,
-final CoordinatorRequestManager 
coordinatorManager) {
+final GroupStateManager groupState,
+final CoordinatorRequestManager coordinatorManager,
+CommitRequestManager commitRequestManager) {

Review Comment:
   Why this is not final?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+public class CommitRequestManager implements RequestManager {
+private final Queue stagedCommits;
+// TODO: We will need to refactor the subscriptionState
+private final SubscriptionState subscriptionState;
+private final Logger log;
+private final Optional autoCommitState;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final GroupStateManager groupState;

Review Comment:
   I'd suggest we either change the class name to just `GroupState` or name 
this field `groupStateManager`? Personally I prefer the former.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache 

[GitHub] [kafka] rondagostino opened a new pull request, #13198: MINOR: Rename IBP_3_4_IV1 as added for KIP-405 to IBP_3_5_IV0

2023-02-03 Thread via GitHub


rondagostino opened a new pull request, #13198:
URL: https://github.com/apache/kafka/pull/13198

   https://github.com/apache/kafka/pull/11390/ will be released as part of AK 
3.5 but it added the MetadataVersion `IBP_3_4_IV1`.  It should have been 
`IBP_3_5_IV0`.  This change causes no incompatibility because this feature is 
not yet released.
   
   Also, `didMetadataChange` is adjusted to `false` because the KRaft metadata 
log records are unaffected.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR

2023-02-03 Thread Calvin Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17684045#comment-17684045
 ] 

Calvin Liu commented on KAFKA-14139:


Hi Alex, just posted the KIP. Let me know if you have any comments.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR

> Replaced disk can lead to loss of committed data even with non-empty ISR
> 
>
> Key: KAFKA-14139
> URL: https://issues.apache.org/jira/browse/KAFKA-14139
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Calvin Liu
>Priority: Major
> Fix For: 3.5.0
>
>
> We have been thinking about disk failure cases recently. Suppose that a disk 
> has failed and the user needs to restart the disk from an empty state. The 
> concern is whether this can lead to the unnecessary loss of committed data.
> For normal topic partitions, removal from the ISR during controlled shutdown 
> buys us some protection. After the replica is restarted, it must prove its 
> state to the leader before it can be added back to the ISR. And it cannot 
> become a leader until it does so.
> An obvious exception to this is when the replica is the last member in the 
> ISR. In this case, the disk failure itself has compromised the committed 
> data, so some amount of loss must be expected.
> We have been considering other scenarios in which the loss of one disk can 
> lead to data loss even when there are replicas remaining which have all of 
> the committed entries. One such scenario is this:
> Suppose we have a partition with two replicas: A and B. Initially A is the 
> leader and it is the only member of the ISR.
>  # Broker B catches up to A, so A attempts to send an AlterPartition request 
> to the controller to add B into the ISR.
>  # Before the AlterPartition request is received, replica B has a hard 
> failure.
>  # The current controller successfully fences broker B. It takes no action on 
> this partition since B is already out of the ISR.
>  # Before the controller receives the AlterPartition request to add B, it 
> also fails.
>  # While the new controller is initializing, suppose that replica B finishes 
> startup, but the disk has been replaced (all of the previous state has been 
> lost).
>  # The new controller sees the registration from broker B first.
>  # Finally, the AlterPartition from A arrives which adds B back into the ISR 
> even though it has an empty log.
> (Credit for coming up with this scenario goes to [~junrao] .)
> I tested this in KRaft and confirmed that this sequence is possible (even if 
> perhaps unlikely). There are a few ways we could have potentially detected 
> the issue. First, perhaps the leader should have bumped the leader epoch on 
> all partitions when B was fenced. Then the inflight AlterPartition would be 
> doomed no matter when it arrived.
> Alternatively, we could have relied on the broker epoch to distinguish the 
> dead broker's state from that of the restarted broker. This could be done by 
> including the broker epoch in both the `Fetch` request and in 
> `AlterPartition`.
> Finally, perhaps even normal kafka replication should be using a unique 
> identifier for each disk so that we can reliably detect when it has changed. 
> For example, something like what was proposed for the metadata quorum here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah opened a new pull request, #13197: Minor: Decode the envelope requests for the request log

2023-02-03 Thread via GitHub


mumrah opened a new pull request, #13197:
URL: https://github.com/apache/kafka/pull/13197

   When looking at the request log for anything forwarded to the controller, 
all we see is ENVELOPE and the base64 encoded bytes of the request body. 
   
   This patch decodes the envelope request so it can be fully logged in the 
request log.
   
   Here is a sample output before
   
   > [2023-02-03 16:29:55,747] DEBUG Completed 
request:{"isForwarded":false,"requestHeader":{"requestApiKey":58,"requestApiVersion":0,"correlationId":6,"clientId":"1","requestApiKeyName":"ENVELOPE"},"request":{"requestData":"ACwAAQMADWFkbWluY2xpZW50LTIAAgIFdGVzdAMOc2VnbWVudC5ieXRlcwAHMjA0ODAwAAtzZWdtZW50Lm1zAQAA","requestPrincipal":"AAAFVXNlcgpBTk9OWU1PVVMAAA==","clientHostAddress":"fwAAAQ=="},"response":{"responseData":null,"errorCode":41},"connection":"127.0.0.1:51244-127.0.0.1:51264-1","totalTimeMs":0.362,"requestQueueTimeMs":0.096,"localTimeMs":0.12,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.051,"sendTimeMs":0.095,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"EXTERNAL","clientInformation":{"softwareName":"apache-kafka-java","softwareVersion":"unknown"}}
 (kafka.request.logger:275)
   
   and after:
   > [2023-02-03 16:25:17,606] DEBUG Completed 
request:{"isForwarded":false,"requestHeader":{"requestApiKey":58,"requestApiVersion":0,"correlationId":0,"clientId":"0","requestApiKeyName":"ENVELOPE"},"request":{"envelopeRequestHeader":{"requestApiKey":44,"requestApiVersion":1,"correlationId":3,"clientId":"adminclient-2","requestApiKeyName":"INCREMENTAL_ALTER_CONFIGS"},"envelopeRequest":{"resources":[{"resourceType":2,"resourceName":"test","configs":[{"name":"segment.bytes","configOperation":0,"value":"204800"},{"name":"segment.ms","configOperation":1,"value":null}]}],"validateOnly":false}},"response":{"responseData":null,"errorCode":41},"connection":"127.0.0.1:50795-127.0.0.1:50813-0","totalTimeMs":4.067,"requestQueueTimeMs":1.86,"localTimeMs":1.927,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.096,"sendTimeMs":0.183,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"EXTERNAL","clientInformation":{"softwareName":"apache-kafka-java","softwareVersion
 ":"unknown"}} (kafka.request.logger:275)
   
   This change slightly increases the logging overhead since it makes a copy of 
the request buffer and runs through the parsing logic. However, this seems like 
a reasonable tradeoff for better visibility.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13195: Minor: Add JmxTool note to 3.5.0 notable changes

2023-02-03 Thread via GitHub


dajac commented on code in PR #13195:
URL: https://github.com/apache/kafka/pull/13195#discussion_r1096238301


##
docs/upgrade.html:
##
@@ -19,6 +19,16 @@
 
 

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

2023-02-03 Thread via GitHub


vcrfxia commented on code in PR #13143:
URL: https://github.com/apache/kafka/pull/13143#discussion_r1096168089


##
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentTest.java:
##
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentTest {
+
+private static final String STORE_NAME = "physical-rocks";
+private static final String METRICS_SCOPE = "metrics-scope";
+private static final String DB_FILE_DIR = "rocksdb";
+private static final Serializer STRING_SERIALIZER = new 
StringSerializer();
+private static final Deserializer STRING_DESERIALIZER = new 
StringDeserializer();
+
+private RocksDBStore physicalStore;
+
+private LogicalKeyValueSegment segment1;
+private LogicalKeyValueSegment segment2;
+
+@Before
+public void setUp() {
+physicalStore = new RocksDBStore(STORE_NAME, DB_FILE_DIR, new 
RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME), false);
+physicalStore.init((StateStoreContext) new 
InternalMockProcessorContext<>(
+TestUtils.tempDirectory(),
+Serdes.String(),
+Serdes.String(),
+new StreamsConfig(StreamsTestUtils.getStreamsConfig())
+), physicalStore);
+
+segment1 = new LogicalKeyValueSegment(1, "segment-1", physicalStore);
+segment2 = new LogicalKeyValueSegment(2, "segment-2", physicalStore);
+}
+
+@After
+public void tearDown() {
+segment1.close();
+segment2.close();
+physicalStore.close();
+}
+
+@Test
+public void shouldPut() {
+final KeyValue kv0 = new KeyValue<>("1", "a");
+final KeyValue kv1 = new KeyValue<>("2", "b");
+
+segment1.put(new Bytes(kv0.key.getBytes(UTF_8)), 
kv0.value.getBytes(UTF_8));
+segment1.put(new Bytes(kv1.key.getBytes(UTF_8)), 
kv1.value.getBytes(UTF_8));
+segment2.put(new Bytes(kv0.key.getBytes(UTF_8)), 
kv0.value.getBytes(UTF_8));
+segment2.put(new Bytes(kv1.key.getBytes(UTF_8)), 
kv1.value.getBytes(UTF_8));
+
+assertEquals("a", getAndDeserialize(segment1, "1"));

Review Comment:
   Ah good point. That's definitely a gap in `shouldPut()` and 
`shouldPutAll()`. All of the other tests are already set up in a way that they 
fail if segments are not properly isolated from each other. Just pushed a fix 
to the two tests which didn't ensure that, and some minor cleanup to a few of 
the other tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-02-03 Thread via GitHub


Hangleton commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1095665461


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManager.java:
##
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Crc32C;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Maintains a mapping from ProducerIds to metadata about the last appended 
entries (e.g.
+ * epoch, sequence number, last offset, etc.)
+ * 
+ * The sequence number is the last number successfully appended to the 
partition for the given identifier.
+ * The epoch is used for fencing against zombie writers. The offset is the one 
of the last successful message
+ * appended to the partition.
+ * 
+ * As long as a producer id is contained in the map, the corresponding 
producer can continue to write data.
+ * However, producer ids can be expired due to lack of recent use or if the 
last written entry has been deleted from
+ * the log (e.g. if the retention policy is "delete"). For compacted topics, 
the log cleaner will ensure
+ * that the most recent entry from a given producer id is retained in the log 
provided it hasn't expired due to
+ * age. This ensures that producer ids will not be expired until either the 
max expiration time has been reached,
+ * or if the topic also is configured for deletion, the segment containing the 
last written offset has
+ * been deleted.
+ */
+public class ProducerStateManager {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerStateManager.class.getName());
+
+// Remove these once UnifiedLog moves to storage module.
+public static final String DELETED_FILE_SUFFIX = ".deleted";
+public static final String PRODUCER_SNAPSHOT_FILE_SUFFIX = ".snapshot";
+
+public static final long LATE_TRANSACTION_BUFFER_MS = 5 * 60 * 1000;
+
+private static final short PRODUCER_SNAPSHOT_VERSION = 1;
+private static final String VERSION_FIELD = "version";
+private static final String CRC_FIELD = "crc";
+private static final String PRODUCER_ID_FIELD = "producer_id";
+private static final String LAST_SEQUENCE_FIELD = "last_sequence";
+private static final String PRODUCER_EPOCH_FIELD = "epoch";
+private static final String LAST_OFFSET_FIELD = "last_offset";
+private static final String OFFSET_DELTA_FIELD = "offset_delta";
+private static final String TIMESTAMP_FIELD = "timestamp";
+private static final String PRODUCER_ENTRIES_FIELD = "producer_entries";
+private static final String COORDINATOR_EPOCH_FIELD = "coordinator_epoch";
+private static final String CURRENT_TXN_FIRST_OFFSET_FIELD = 
"current_txn_first_offset";
+
+private static final int VERSION_OFFSET = 0;
+private static final int CRC_OFFSET = VERSION_OFFSET + 2;
+private static final int PR

[GitHub] [kafka] hachikuji commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-02-03 Thread via GitHub


hachikuji commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1096118406


##
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java:
##
@@ -372,7 +375,10 @@ public enum Errors {
 FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered 
inconsistent topic ID usage", FetchSessionTopicIdException::new),
 INELIGIBLE_REPLICA(107, "The new ISR contains at least one ineligible 
replica.", IneligibleReplicaException::new),
 NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated 
the partition state but the leader has changed.", 
NewLeaderElectedException::new),
-OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to 
tiered storage.", OffsetMovedToTieredStorageException::new);
+OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to 
tiered storage.", OffsetMovedToTieredStorageException::new),
+FENCED_MEMBER_EPOCH(110, "The member epoch is fenced by the group 
coordinator. The member must abandon all its partitions and rejoins.", 
FencedMemberEpochException::new),

Review Comment:
   nit: rejoin (singular)?



##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json:
##
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 68,
+  "type": "response",
+  "name": "ConsumerGroupHeartbeatResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED
+  // - NOT_COORDINATOR
+  // - COORDINATOR_NOT_AVAILABLE
+  // - COORDINATOR_LOAD_IN_PROGRESS
+  // - INVALID_REQUEST
+  // - UNKNOWN_MEMBER_ID
+  // - FENCED_MEMBER_EPOCH
+  // - UNSUPPORTED_ASSIGNOR
+  // - UNRELEASED_INSTANCE_ID
+  // - GROUP_MAX_SIZE_REACHED
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The top-level error code, or 0 if there was no error" },
+{ "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",

Review Comment:
   I think we have an 'errorMessage' entity type.



##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json:
##
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 68,
+  "type": "request",
+  "listeners": ["zkBroker", "broker"],
+  "name": "ConsumerGroupHeartbeatRequest",
+  // The ConsumerGroupHeartbeat API is added as part of KIP-848 and is still
+  // under developement. Hence, the API is not exposed by default by brokers
+  // unless explicitely enabled.
+  "latestVersionUnstable": true,
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
+  "about": "The group identifier." },
+{ "name": "MemberId", "type": "string", "versions": "0+",
+  "about": "The member id generated by the coordinator. The member id must 
be kept during the entire lifetime of the member." },
+{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
+  "about": "The current member epoch; 0 to join the group; -1 to leave the 
group; -2 to indicate that the static member will

[GitHub] [kafka] guozhangwang commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-03 Thread via GitHub


guozhangwang commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1096112947


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -501,13 +501,16 @@ boolean joinGroupIfNeeded(final Timer timer) {
 }
 
 if (exception instanceof UnknownMemberIdException ||
-exception instanceof IllegalGenerationException ||
-exception instanceof RebalanceInProgressException ||
-exception instanceof MemberIdRequiredException)
+exception instanceof IllegalGenerationException ||
+exception instanceof RebalanceInProgressException ||
+exception instanceof MemberIdRequiredException)
 continue;
 else if (!future.isRetriable())
 throw exception;
 
+if (timer.isExpired()) {

Review Comment:
   Could you add a couple comments here explaining why we check the timer again 
here in addition to in line 452 above? Maybe something like this:
   
   ```
   We check the timer again after calling poll with the timer since it's 
possible that even after the timer has elapsed, the next client.poll(timer) 
would immediately return an error response which would cause us to not exiting 
the while loop.
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -501,13 +501,16 @@ boolean joinGroupIfNeeded(final Timer timer) {
 }
 
 if (exception instanceof UnknownMemberIdException ||
-exception instanceof IllegalGenerationException ||
-exception instanceof RebalanceInProgressException ||
-exception instanceof MemberIdRequiredException)
+exception instanceof IllegalGenerationException ||
+exception instanceof RebalanceInProgressException ||
+exception instanceof MemberIdRequiredException)

Review Comment:
   Should we actually do the timer check before this? Since otherwise if the 
exception from the immediately returned responses is any of those four, we 
would still `continue` and skip the check below.
   
   More concretely I think we can just move the remaining logic inside the `if` 
call:
   
   ```
   if (!future.isRetriable()) {
   throw ..
   } else {
   if (timer.isExpired()) {
   return false;
   } else if (exception instance of..) {
   continue;
   } else {
   timer.sleep(..)
   }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #13177: [KAFKA-14441] Benchmark performance impact of metrics library

2023-02-03 Thread via GitHub


jeqo commented on code in PR #13177:
URL: https://github.com/apache/kafka/pull/13177#discussion_r1096076032


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/metrics/SensorBenchmark.java:
##
@@ -0,0 +1,145 @@
+package org.apache.kafka.jmh.common.metrics;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)

Review Comment:
   Sure! I have added SampleTime, and got some interesting stats:
   
   ```
   Benchmark
  (numSensors)Mode  CntScore  Error  Units
   SensorBenchmark.recordBenchmark  
 1avgt4  122.285 ±3.831  ns/op
   SensorBenchmark.recordBenchmark  
 5avgt4  535.055 ±   10.002  ns/op
   SensorBenchmark.recordBenchmark  
20avgt4 2153.720 ±  562.511  ns/op
   SensorBenchmark.recordWithAvgBenchmark   
 1avgt4  148.655 ±7.245  ns/op
   SensorBenchmark.recordWithAvgBenchmark   
 5avgt4  801.836 ±   25.243  ns/op
   SensorBenchmark.recordWithAvgBenchmark   
20avgt4 5934.782 ±  463.325  ns/op
   SensorBenchmark.recordWithMaxAvgBenchmark
 1avgt4  153.731 ±8.982  ns/op
   SensorBenchmark.recordWithMaxAvgBenchmark
 5avgt4 1067.862 ±   68.656  ns/op
   SensorBenchmark.recordWithMaxAvgBenchmark
20avgt410486.323 ±  209.097  ns/op
   SensorBenchmark.recordWithMaxBenchmark   
 1avgt4  148.492 ±   20.097  ns/op
   SensorBenchmark.recordWithMaxBenchmark   
 5avgt4  807.746 ±  300.785  ns/op
   SensorBenchmark.recordWithMaxBenchmark   
20avgt4 6361.096 ± 1160.907  ns/op
   SensorBenchmark.recordWithMinMaxAvgBenchmark 
 1avgt4  178.676 ±   12.371  ns/op
   SensorBenchmark.recordWithMinMaxAvgBenchmark 
 5avgt4 1749.455 ±  204.069  ns/op
   SensorBenchmark.recordWithMinMaxAvgBenchmark 
20avgt425027.613 ± 1407.740  ns/op
   SensorBenchmark.recordWithMinMaxBenchmark
 1avgt4  158.032 ±9.249  ns/op
   SensorBenchmark.recordWithMinMaxBenchmark
 5avgt4 1001.310 ±   48.258  ns/op
   SensorBenchmark.recordWithMinMaxBenchmark
20avgt4 8958.896 ±  264.169  ns/op
   SensorBenchmark.recordWithPercentileBenchmark
 1avgt4  133.637 ±7.675  ns/op
   SensorBenchmark.recordWithPercentileBenchmark
 5avgt4  766.876 ±   14.212  ns/op
   SensorBenchmark.recordWithPercentileBenchmark
20avgt4 5661.163 ±  463.749  ns/op
   SensorBenchmark.recordWithWindowedSumBenchmark   
 1avgt4  119.896 ±   13.392  ns/op
   SensorBenchmark.recordWithWindowedSumBenchmark   
 5avgt4  663.232 ±9.704 

[GitHub] [kafka] jeqo commented on a diff in pull request #13177: [KAFKA-14441] Benchmark performance impact of metrics library

2023-02-03 Thread via GitHub


jeqo commented on code in PR #13177:
URL: https://github.com/apache/kafka/pull/13177#discussion_r1096074802


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/metrics/SensorBenchmark.java:
##
@@ -0,0 +1,145 @@
+package org.apache.kafka.jmh.common.metrics;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)

Review Comment:
   > Unsure how representative the benchmark is for high-throughput invocations 
in production environments.
   
   Yeah, I don't think these are highly representative, but a starting point to 
compare stats and how adding either sensors or stats affect latency (which is 
the concern from KIP-864). 
   Though, happy to hear thoughts on how to make these more solid. e.g. I have 
an idea to also add a benchmark of Yammer metrics to compare the impact of 
these two libraries.
   
   Also, found there was some work around it: 
   - 
[MetricsBench](https://github.com/apache/kafka/blob/a8aedc85ebfadcf1472acafe2e0311a73d3040be/clients/src/test/java/org/apache/kafka/test/MetricsBench.java):
 simple average time execution of a sensor + a bunch of stats.
   - 
[Microbenchmarks](https://github.com/apache/kafka/blob/00e5803cd3af89011254e734232308618403309d/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java)
 which seems more elaborated, but haven't looked into details yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #13177: [KAFKA-14441] Benchmark performance impact of metrics library

2023-02-03 Thread via GitHub


jeqo commented on code in PR #13177:
URL: https://github.com/apache/kafka/pull/13177#discussion_r1096069586


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/metrics/SensorBenchmark.java:
##
@@ -0,0 +1,145 @@
+package org.apache.kafka.jmh.common.metrics;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)

Review Comment:
   Sure, I reduced it for testing purpose. Increasing to 15, similar to other 
benchmarks — as I realize it's possible to override these via args.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #13177: [KAFKA-14441] Benchmark performance impact of metrics library

2023-02-03 Thread via GitHub


jeqo commented on code in PR #13177:
URL: https://github.com/apache/kafka/pull/13177#discussion_r1096068517


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/metrics/SensorBenchmark.java:
##
@@ -0,0 +1,145 @@
+package org.apache.kafka.jmh.common.metrics;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class SensorBenchmark {
+
+@State(Scope.Benchmark)
+public static class SensorWithNoStats {
+
+Metrics metrics = new Metrics();
+
+List sensors = new ArrayList<>();
+@Param({"1", "5", "20"})

Review Comment:
   Yes, e.g Sink task iteration has around 5 sensor records. Adding a comment 
to clarify this. Thanks!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-7735) StateChangeLogMerger tool can not work due to incorrect topic regular matches

2023-02-03 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri reassigned KAFKA-7735:
--

Assignee: Federico Valeri

> StateChangeLogMerger tool can not work due to incorrect topic regular matches
> -
>
> Key: KAFKA-7735
> URL: https://issues.apache.org/jira/browse/KAFKA-7735
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 2.0.0
>Reporter: Fangbin Sun
>Assignee: Federico Valeri
>Priority: Major
>
> When StateChangeLogMerger tool tries to obtain a topic's state-change-log, it 
> returns nothing.
> {code:java}
> bin/kafka-run-class.sh com.cmss.kafka.api.StateChangeLogMerger --logs 
> state-change.log --topic test{code}
> This tool uses a topic partition regex as follows:
> {code:java}
> val topicPartitionRegex = new Regex("\\[(" + Topic.LEGAL_CHARS + "+),( 
> )*([0-9]+)\\]"){code}
> However the state-change-log no longer prints log in the above format. e.g. 
> in 0.10.2.0, it prints some state-change logs by case class TopicAndPartition 
> which overrided as follows:
> {code:java}
> override def toString = "[%s,%d]".format(topic, partition){code}
> In a newer version (e.g. 1.0.0+) it prints most of state-change logs in the 
> form of "partition $topic-$partition", as a workaround one can modify the 
> topic partition regex like:
> {code:java}
> val topicPartitionRegex = new Regex("(partition " + Topic.LEGAL_CHARS + 
> "+)-([0-9]+)"){code}
> and match topic with "matcher.group(1).substring(10)", however some output of 
> state changes might be a little bit redundant.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] fvaleri commented on pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool

2023-02-03 Thread via GitHub


fvaleri commented on PR #13171:
URL: https://github.com/apache/kafka/pull/13171#issuecomment-1416167959

   @mimaison thanks for spotting this! 
   
   I saw that ticket, but I started to test with an old log file for testing 
purpose, and then forgot to also test it on a recent log version. I agree that 
this is not much used, but maybe the logic can be reused to create something 
similar.
   
   With the latest commit, I make it work with the latest state change log 
format and addressed your comments. Before merging, we need to rebase with 
https://github.com/apache/kafka/pull/13195, so that I can also add this tool to 
the notable changes note.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2023-02-03 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-14133:
--
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
[~yash.mayya] )
 # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#00875a}TopologyTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
 # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
 # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
 # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
 # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
 # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
 # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew)
 # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in 
https://issues.apache.org/jira/browse/KAFKA-12947)
 # {color:#ff8b00}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: 
[~shekharrajak]) [https://github.com/apache/kafka/pull/12739] 
 # {color:#00875a}TimeOrderedWindowStoreTest{color} (owners: [~shekharrajak]) 
[https://github.com/apache/kafka/pull/12777] 

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest

[GitHub] [kafka] clolov commented on pull request #12777: Replace EasyMock and PowerMock with Mockito - TimeOrderedWindowStoreTest

2023-02-03 Thread via GitHub


clolov commented on PR #12777:
URL: https://github.com/apache/kafka/pull/12777#issuecomment-1416159492

   Great, thank you very much @guozhangwang!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool

2023-02-03 Thread via GitHub


fvaleri commented on code in PR #13171:
URL: https://github.com/apache/kafka/pull/13171#discussion_r1096043542


##
tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger.java:
##
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.Arrays;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Scanner;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/**
+ * A utility that merges the state change logs (possibly obtained from 
different brokers and over multiple days).
+ *
+ * This utility expects at least one of the following two arguments -
+ * 1. A list of state change log files
+ * 2. A regex to specify state change log file names.
+ *
+ * This utility optionally also accepts the following arguments -
+ * 1. The topic whose state change logs should be merged
+ * 2. A list of partitions whose state change logs should be merged (can be 
specified only when the topic argument
+ * is explicitly specified)
+ * 3. Start time from when the logs should be merged
+ * 4. End time until when the logs should be merged
+ */
+public class StateChangeLogMerger {
+private static final String DATE_FORMAT = "-MM-dd HH:mm:ss,SSS";
+private static final Pattern DATE_PATTERN = 
Pattern.compile("([0-9]{4}-[0-9]{2}-[0-9]{2} 
[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}){1}");
+private static final Pattern TOPIC_PART_PATTERN = Pattern.compile("\\[(" + 
Topic.LEGAL_CHARS + "+),([0-9]+)\\]");
+
+public static void main(String[] args) {
+try {
+StateChangeLogMergerOptions options = new 
StateChangeLogMergerOptions(args);
+if (printHelpOrVersion(options) || !validInput(options)) return;

Review Comment:
   Yes, that was the original intention. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14132) Remaining PowerMock to Mockito tests

2023-02-03 Thread Hector Geraldino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683961#comment-17683961
 ] 

Hector Geraldino commented on KAFKA-14132:
--

Hey [~divijvaidya],

I'm putting together a KIP around 
[https://github.com/apache/kafka/pull/13193|https://github.com/apache/kafka/pull/13193,]
 in which I'll propose adding new metric counters for filtered (skipped) 
records.  The POC is code complete, but for it to be ready I need to add new 
test cases on *WorkerSinkTaskTest* and *WorkerSinkTaskThreadedTest* unit tests. 
These two are assigned to you, and I was wondering if you're OK with me picking 
them up, this will unblock my other work. I did the same for 
https://issues.apache.org/jira/browse/KAFKA-14659 (had to raise 
[https://github.com/apache/kafka/pull/13191] before adding tests for that bug 
fix)

Wdyt?

> Remaining PowerMock to Mockito tests
> 
>
> Key: KAFKA-14132
> URL: https://issues.apache.org/jira/browse/KAFKA-14132
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}Some of the tests below use EasyMock as well. For those 
> migrate both PowerMock and EasyMock to Mockito.{color}
> Unless stated in brackets the tests are in the connect module.
> A list of tests which still require to be moved from PowerMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#ff8b00}InReview{color}
> {color:#00875a}Merged{color}
>  # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
>  # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
>  # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
>  # WorkerSinkTaskTest (owner: Divij) *WIP* 
>  # WorkerSinkTaskThreadedTest (owner: Divij) *WIP*
>  # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
>  # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) 
> ([https://github.com/apache/kafka/pull/12728])
>  # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven])
>  # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) 
> ([https://github.com/apache/kafka/pull/12418])
>  # {color:#ff8b00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven])
>  # RetryUtilTest (owner: [~mdedetrich-aiven] )
>  # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo)
>  # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo)
> *The coverage report for the above tests after the change should be >= to 
> what the coverage is now.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-02-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683951#comment-17683951
 ] 

Matthias J. Sax edited comment on KAFKA-14660 at 2/3/23 4:52 PM:
-

Not sure why the PR was not auto-linked... Fixed.

[https://github.com/apache/kafka/pull/13175]

Thanks for your follow up. Can we close this ticket? Let me know if there is 
anything else I can do.


was (Author: mjsax):
Not sure why the PR was not auto-linked... Fixed.

[https://github.com/apache/kafka/pull/13175]

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>Assignee: Matthias J. Sax
>Priority: Minor
> Fix For: 3.5.0
>
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven&component-name=org.apache.kafka%2Fkafka-streams&utm_source=ossindex-client&utm_medium=integration&utm_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-02-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683951#comment-17683951
 ] 

Matthias J. Sax commented on KAFKA-14660:
-

Not sure why the PR was not auto-linked... Fixed.

[https://github.com/apache/kafka/pull/13175]

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>Assignee: Matthias J. Sax
>Priority: Minor
> Fix For: 3.5.0
>
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven&component-name=org.apache.kafka%2Fkafka-streams&utm_source=ossindex-client&utm_medium=integration&utm_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool

2023-02-03 Thread via GitHub


mimaison commented on code in PR #13171:
URL: https://github.com/apache/kafka/pull/13171#discussion_r1094648508


##
tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger.java:
##
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.Arrays;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Scanner;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/**
+ * A utility that merges the state change logs (possibly obtained from 
different brokers and over multiple days).
+ *
+ * This utility expects at least one of the following two arguments -
+ * 1. A list of state change log files
+ * 2. A regex to specify state change log file names.
+ *
+ * This utility optionally also accepts the following arguments -
+ * 1. The topic whose state change logs should be merged
+ * 2. A list of partitions whose state change logs should be merged (can be 
specified only when the topic argument
+ * is explicitly specified)
+ * 3. Start time from when the logs should be merged
+ * 4. End time until when the logs should be merged
+ */
+public class StateChangeLogMerger {
+private static final String DATE_FORMAT = "-MM-dd HH:mm:ss,SSS";
+private static final Pattern DATE_PATTERN = 
Pattern.compile("([0-9]{4}-[0-9]{2}-[0-9]{2} 
[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}){1}");
+private static final Pattern TOPIC_PART_PATTERN = Pattern.compile("\\[(" + 
Topic.LEGAL_CHARS + "+),([0-9]+)\\]");
+
+public static void main(String[] args) {
+try {
+StateChangeLogMergerOptions options = new 
StateChangeLogMergerOptions(args);
+if (printHelpOrVersion(options) || !validInput(options)) return;

Review Comment:
   What about doing:
   ```
   CommandLineUtils.maybePrintHelpOrVersion(options,
   "A tool for merging the log files from several brokers to 
reconstruct a unified history of what happened.");
   validInput(options);
   ```
   And remove the `printHelpOrVersion()` method and have `validInput()` return 
`void`?



##
tools/src/test/java/org/apache/kafka/tools/StateChangeLogMergerTest.java:
##
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java

[jira] [Updated] (KAFKA-14673) Add high watermark listener to Partition/Log layers

2023-02-03 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-14673:

Summary: Add high watermark listener to Partition/Log layers  (was: High 
watermark listener)

> Add high watermark listener to Partition/Log layers
> ---
>
> Key: KAFKA-14673
> URL: https://issues.apache.org/jira/browse/KAFKA-14673
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac opened a new pull request, #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

2023-02-03 Thread via GitHub


dajac opened a new pull request, #13196:
URL: https://github.com/apache/kafka/pull/13196

   In the context of KIP-848, we implements are new group coordinator in Java. 
This new coordinator follows the architecture of the new quorum controller. It 
is basically a replicated state machine that writes to the log and commits its 
internal state when the writes are committed. At the moment, the only way to 
know when a write is committed is to use a delayed fetch. This is not ideal in 
our context because a delayed fetch can be completed before the write is 
actually committed to the log.
   
   This patch proposes to introduce a high watermark listener to the 
Partition/Log layers. This will allow the new group coordinator to simply 
listen to changes and commit its state based on this. This mechanism is simpler 
and lighter as well.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-5756) Synchronization issue on flush

2023-02-03 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683888#comment-17683888
 ] 

Mickael Maison commented on KAFKA-5756:
---

As far as I can tell this is still an issue. [~gharris1727] Were you planning 
to propose a fix?

> Synchronization issue on flush
> --
>
> Key: KAFKA-5756
> URL: https://issues.apache.org/jira/browse/KAFKA-5756
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Oleg Kuznetsov
>Priority: Major
> Fix For: 0.11.0.1, 1.0.0
>
>
> Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* 
> method, whereas this collection can be accessed from 2 different threads:
> - *WorkerSourceTask.execute()*, finally block
> - *SourceTaskOffsetCommitter*, from periodic flush task



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14577) Move ConsoleProducer to tools

2023-02-03 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez reassigned KAFKA-14577:
-

Assignee: Alexandre Dupriez

> Move ConsoleProducer to tools
> -
>
> Key: KAFKA-14577
> URL: https://issues.apache.org/jira/browse/KAFKA-14577
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Alexandre Dupriez
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #13194: MINOR: Update incorrect / misleading comment regarding rebalance exceptions in WorkerSinkTask

2023-02-03 Thread via GitHub


C0urante merged PR #13194:
URL: https://github.com/apache/kafka/pull/13194


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14577) Move ConsoleProducer to tools

2023-02-03 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim reassigned KAFKA-14577:
-

Assignee: (was: Omnia Ibrahim)

> Move ConsoleProducer to tools
> -
>
> Key: KAFKA-14577
> URL: https://issues.apache.org/jira/browse/KAFKA-14577
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14579) Move DumpLogSegments to tools

2023-02-03 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim reassigned KAFKA-14579:
-

Assignee: (was: Omnia Ibrahim)

> Move DumpLogSegments to tools
> -
>
> Key: KAFKA-14579
> URL: https://issues.apache.org/jira/browse/KAFKA-14579
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Hangleton commented on a diff in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580

2023-02-03 Thread via GitHub


Hangleton commented on code in PR #11627:
URL: https://github.com/apache/kafka/pull/11627#discussion_r1095735143


##
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java:
##
@@ -32,7 +32,16 @@
 private final double expMax;
 private final long initialInterval;
 private final double jitter;
+private long attemptedCount = 0;

Review Comment:
   Is it guaranteed instances of this class are confined to one thread hence no 
synchronization is needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13182: KAFKA-14649: Isolate failures during plugin path scanning to single plugin classes

2023-02-03 Thread via GitHub


clolov commented on code in PR #13182:
URL: https://github.com/apache/kafka/pull/13182#discussion_r1095729055


##
connect/runtime/src/test/resources/test-plugins/fail-to-initialize/test/plugins/OuterClass.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * Defines a connector as an non-static inner class, which does not have a 
default constructor.

Review Comment:
   ```suggestion
* Defines a connector as a non-static inner class, which does not have a 
default constructor.
   ```
   ```suggestion
* Defines a connector as a non-static inner class, which does not have a 
default constructor.
   ```



##
connect/runtime/src/test/resources/test-plugins/fail-to-initialize/test/plugins/CoLocatedPlugin.java:
##
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Map;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+
+/**
+ * Fake plugin class for testing classloading isolation.
+ * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
+ * This is a plugin co-located with other poorly packaged plugins, but should 
be visible despite other errors.

Review Comment:
   ```suggestion
* This is a plugin co-located with other poorly packaged plugins, but 
should be visible despite other errors.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-02-03 Thread via GitHub


Hangleton commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1095665461


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManager.java:
##
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Crc32C;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Maintains a mapping from ProducerIds to metadata about the last appended 
entries (e.g.
+ * epoch, sequence number, last offset, etc.)
+ * 
+ * The sequence number is the last number successfully appended to the 
partition for the given identifier.
+ * The epoch is used for fencing against zombie writers. The offset is the one 
of the last successful message
+ * appended to the partition.
+ * 
+ * As long as a producer id is contained in the map, the corresponding 
producer can continue to write data.
+ * However, producer ids can be expired due to lack of recent use or if the 
last written entry has been deleted from
+ * the log (e.g. if the retention policy is "delete"). For compacted topics, 
the log cleaner will ensure
+ * that the most recent entry from a given producer id is retained in the log 
provided it hasn't expired due to
+ * age. This ensures that producer ids will not be expired until either the 
max expiration time has been reached,
+ * or if the topic also is configured for deletion, the segment containing the 
last written offset has
+ * been deleted.
+ */
+public class ProducerStateManager {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerStateManager.class.getName());
+
+// Remove these once UnifiedLog moves to storage module.
+public static final String DELETED_FILE_SUFFIX = ".deleted";
+public static final String PRODUCER_SNAPSHOT_FILE_SUFFIX = ".snapshot";
+
+public static final long LATE_TRANSACTION_BUFFER_MS = 5 * 60 * 1000;
+
+private static final short PRODUCER_SNAPSHOT_VERSION = 1;
+private static final String VERSION_FIELD = "version";
+private static final String CRC_FIELD = "crc";
+private static final String PRODUCER_ID_FIELD = "producer_id";
+private static final String LAST_SEQUENCE_FIELD = "last_sequence";
+private static final String PRODUCER_EPOCH_FIELD = "epoch";
+private static final String LAST_OFFSET_FIELD = "last_offset";
+private static final String OFFSET_DELTA_FIELD = "offset_delta";
+private static final String TIMESTAMP_FIELD = "timestamp";
+private static final String PRODUCER_ENTRIES_FIELD = "producer_entries";
+private static final String COORDINATOR_EPOCH_FIELD = "coordinator_epoch";
+private static final String CURRENT_TXN_FIRST_OFFSET_FIELD = 
"current_txn_first_offset";
+
+private static final int VERSION_OFFSET = 0;
+private static final int CRC_OFFSET = VERSION_OFFSET + 2;
+private static final int PR

[GitHub] [kafka] clolov commented on pull request #13193: KAFKA-14659 source-record-write-[rate|total] metrics include filtered records

2023-02-03 Thread via GitHub


clolov commented on PR #13193:
URL: https://github.com/apache/kafka/pull/13193#issuecomment-1415746102

   Thank you for spotting this behaviour and contributing a fix! The proposed 
solution makes sense to me, I will just add @C0urante to put the appropriate 
tags for "connect" to this pull request and review it once it has been rebased.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14582) Move JmxTool to tools

2023-02-03 Thread Federico Valeri (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683838#comment-17683838
 ] 

Federico Valeri commented on KAFKA-14582:
-

https://github.com/apache/kafka/pull/13195

> Move JmxTool to tools
> -
>
> Key: KAFKA-14582
> URL: https://issues.apache.org/jira/browse/KAFKA-14582
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Federico Valeri
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

2023-02-03 Thread via GitHub


clolov commented on PR #13184:
URL: https://github.com/apache/kafka/pull/13184#issuecomment-1415734118

   I see what you mean. Okay, I am happy with it. Could you just rename the 
variable names from transformation to transformationStage as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-02-03 Thread via GitHub


Hangleton commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1095665461


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManager.java:
##
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Crc32C;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Maintains a mapping from ProducerIds to metadata about the last appended 
entries (e.g.
+ * epoch, sequence number, last offset, etc.)
+ * 
+ * The sequence number is the last number successfully appended to the 
partition for the given identifier.
+ * The epoch is used for fencing against zombie writers. The offset is the one 
of the last successful message
+ * appended to the partition.
+ * 
+ * As long as a producer id is contained in the map, the corresponding 
producer can continue to write data.
+ * However, producer ids can be expired due to lack of recent use or if the 
last written entry has been deleted from
+ * the log (e.g. if the retention policy is "delete"). For compacted topics, 
the log cleaner will ensure
+ * that the most recent entry from a given producer id is retained in the log 
provided it hasn't expired due to
+ * age. This ensures that producer ids will not be expired until either the 
max expiration time has been reached,
+ * or if the topic also is configured for deletion, the segment containing the 
last written offset has
+ * been deleted.
+ */
+public class ProducerStateManager {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerStateManager.class.getName());
+
+// Remove these once UnifiedLog moves to storage module.
+public static final String DELETED_FILE_SUFFIX = ".deleted";
+public static final String PRODUCER_SNAPSHOT_FILE_SUFFIX = ".snapshot";
+
+public static final long LATE_TRANSACTION_BUFFER_MS = 5 * 60 * 1000;
+
+private static final short PRODUCER_SNAPSHOT_VERSION = 1;
+private static final String VERSION_FIELD = "version";
+private static final String CRC_FIELD = "crc";
+private static final String PRODUCER_ID_FIELD = "producer_id";
+private static final String LAST_SEQUENCE_FIELD = "last_sequence";
+private static final String PRODUCER_EPOCH_FIELD = "epoch";
+private static final String LAST_OFFSET_FIELD = "last_offset";
+private static final String OFFSET_DELTA_FIELD = "offset_delta";
+private static final String TIMESTAMP_FIELD = "timestamp";
+private static final String PRODUCER_ENTRIES_FIELD = "producer_entries";
+private static final String COORDINATOR_EPOCH_FIELD = "coordinator_epoch";
+private static final String CURRENT_TXN_FIRST_OFFSET_FIELD = 
"current_txn_first_offset";
+
+private static final int VERSION_OFFSET = 0;
+private static final int CRC_OFFSET = VERSION_OFFSET + 2;
+private static final int PR

[jira] [Commented] (KAFKA-14582) Move JmxTool to tools

2023-02-03 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683796#comment-17683796
 ] 

Mickael Maison commented on KAFKA-14582:


Can you open a small PR to add this to the docs? For each version we have a 
short Notable Changes section. For example: 
https://kafka.apache.org/documentation/#upgrade_33_notable

> Move JmxTool to tools
> -
>
> Key: KAFKA-14582
> URL: https://issues.apache.org/jira/browse/KAFKA-14582
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Federico Valeri
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14593) Move LeaderElectionCommand to tools

2023-02-03 Thread Alexandre Dupriez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683783#comment-17683783
 ] 

Alexandre Dupriez commented on KAFKA-14593:
---

Hello [~omnia_h_ibrahim],

Is this JIRA actively worked on? If not, would you be happy to delegate it?

Many thanks,

Alexandre

> Move LeaderElectionCommand to tools
> ---
>
> Key: KAFKA-14593
> URL: https://issues.apache.org/jira/browse/KAFKA-14593
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Omnia Ibrahim
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] nizhikov commented on pull request #13157: KAFKA-14599 Enable javadoc for :connect:mirror module

2023-02-03 Thread via GitHub


nizhikov commented on PR #13157:
URL: https://github.com/apache/kafka/pull/13157#issuecomment-1415407654

   Hello @mimaison 
   
   Do we still want to fix javadoc for public interfaces?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-02-03 Thread via GitHub


Hangleton commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1095498625


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManager.java:
##
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Crc32C;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Maintains a mapping from ProducerIds to metadata about the last appended 
entries (e.g.
+ * epoch, sequence number, last offset, etc.)
+ * 
+ * The sequence number is the last number successfully appended to the 
partition for the given identifier.
+ * The epoch is used for fencing against zombie writers. The offset is the one 
of the last successful message
+ * appended to the partition.
+ * 
+ * As long as a producer id is contained in the map, the corresponding 
producer can continue to write data.
+ * However, producer ids can be expired due to lack of recent use or if the 
last written entry has been deleted from
+ * the log (e.g. if the retention policy is "delete"). For compacted topics, 
the log cleaner will ensure
+ * that the most recent entry from a given producer id is retained in the log 
provided it hasn't expired due to
+ * age. This ensures that producer ids will not be expired until either the 
max expiration time has been reached,
+ * or if the topic also is configured for deletion, the segment containing the 
last written offset has
+ * been deleted.
+ */
+public class ProducerStateManager {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerStateManager.class.getName());
+
+// Remove these once UnifiedLog moves to storage module.
+public static final String DELETED_FILE_SUFFIX = ".deleted";
+public static final String PRODUCER_SNAPSHOT_FILE_SUFFIX = ".snapshot";
+
+public static final long LATE_TRANSACTION_BUFFER_MS = 5 * 60 * 1000;
+
+private static final short PRODUCER_SNAPSHOT_VERSION = 1;
+private static final String VERSION_FIELD = "version";
+private static final String CRC_FIELD = "crc";
+private static final String PRODUCER_ID_FIELD = "producer_id";
+private static final String LAST_SEQUENCE_FIELD = "last_sequence";
+private static final String PRODUCER_EPOCH_FIELD = "epoch";
+private static final String LAST_OFFSET_FIELD = "last_offset";
+private static final String OFFSET_DELTA_FIELD = "offset_delta";
+private static final String TIMESTAMP_FIELD = "timestamp";
+private static final String PRODUCER_ENTRIES_FIELD = "producer_entries";
+private static final String COORDINATOR_EPOCH_FIELD = "coordinator_epoch";
+private static final String CURRENT_TXN_FIRST_OFFSET_FIELD = 
"current_txn_first_offset";
+
+private static final int VERSION_OFFSET = 0;
+private static final int CRC_OFFSET = VERSION_OFFSET + 2;
+private static final int PR