jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1581905174


##########
raft/src/main/java/org/apache/kafka/raft/internals/PartitionListener.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Optional;
+import java.util.OptionalLong;
+import org.apache.kafka.common.message.KRaftVersionRecord;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.ControlRecord;
+import org.apache.kafka.raft.Isolation;
+import org.apache.kafka.raft.LogFetchInfo;
+import org.apache.kafka.raft.ReplicatedLog;
+import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.snapshot.RawSnapshotReader;
+import org.apache.kafka.snapshot.RecordsSnapshotReader;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.slf4j.Logger;
+
+/**
+ * The KRaft state machine for tracking control records in the topic partition.
+ *
+ * This type keeps track of changes to the finalized kraft.version and the 
sets of voters.
+ */
+final public class PartitionListener {
+    private final ReplicatedLog log;
+    private final RecordSerde<?> serde;
+    private final BufferSupplier bufferSupplier;
+    private final Logger logger;
+    private final int maxBatchSizeBytes;
+
+    // These are objects are synchronized using the perspective object 
monitor. The two actors
+    // are the KRaft driver and the RaftClient callers
+    private final VoterSetHistory voterSetHistory;
+    private final History<Short> kraftVersionHistory = new TreeMapHistory<>();
+
+    // This synchronization is enough because
+    // 1. The write operation updateListener only sets the value without 
reading and updates to
+    // voterSetHistory or kraftVersionHistory are done before setting the 
nextOffset
+    //
+    // 2. The read operations lastVoterSet, voterSetAtOffset and 
kraftVersionAtOffset read
+    // the nextOffset first before reading voterSetHistory or 
kraftVersionHistory
+    private volatile long nextOffset = 0;
+
+    /**
+     * Constructs an internal log listener
+     *
+     * @param staticVoterSet the set of voter statically configured
+     * @param log the on disk topic partition
+     * @param serde the record decoder for data records
+     * @param bufferSupplier the supplier of byte buffers
+     * @param maxBatchSizeBytes the maximum size of record batch
+     * @param logContext the log context
+     */
+    public PartitionListener(
+        Optional<VoterSet> staticVoterSet,
+        ReplicatedLog log,
+        RecordSerde<?> serde,

Review Comment:
   This type doesn't care about the actual type here since it only reads 
control records. Let's keep it as any object for now so that we can use 
`String` and `StringSerde` in the test.
   
   I actually created 
[KAFKA-16517](https://issues.apache.org/jira/browse/KAFKA-16517) to eventually 
remove the need to decode data record. I'll implement that after KIP-853 is 
feature complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to