jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1412627530
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/ContextStateMachine.java: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +/** + * ContextStateMachine is a wrapper on top of the coordinator state machine. Generally, only a single thread at a + * time will access this object but multiple threads may access while loading the __consumer_offsets topic partition. + */ +class ContextStateMachine<S extends CoordinatorShard<U>, U> implements CoordinatorPlayback<U> { + /** + * The logger. + */ + private final Logger log; + /** + * The actual state machine. + */ + private S coordinator; + + /** + * The snapshot registry backing the coordinator. + */ + private final SnapshotRegistry snapshotRegistry; + + /** + * The topic partition. + */ + private final TopicPartition tp; + + /** + * The last offset written to the partition. + */ + private long lastWrittenOffset; + + /** + * The last offset committed. This represents the high + * watermark of the partition. + */ + private long lastCommittedOffset; + + ContextStateMachine( + LogContext logContext, + SnapshotRegistry snapshotRegistry, + S coordinator, + TopicPartition tp + ) { + this.log = logContext.logger(ContextStateMachine.class); + this.coordinator = coordinator; + this.snapshotRegistry = snapshotRegistry; + this.tp = tp; + this.lastWrittenOffset = 0; + this.lastCommittedOffset = 0; + snapshotRegistry.getOrCreateSnapshot(0); + snapshotRegistry.deleteSnapshotsUpTo(0); + } + + /** + * Reverts the last written offset. This also reverts the snapshot + * registry to this offset. All the changes applied after the offset + * are lost. + * + * @param offset The offset to revert to. + */ + synchronized void revertLastWrittenOffset( + long offset + ) { + if (offset > lastWrittenOffset) { + throw new IllegalStateException("New offset " + offset + " of " + tp + + " must be smaller than " + lastWrittenOffset + "."); + } + + log.debug("Revert last written offset of {} to {}.", tp, offset); + lastWrittenOffset = offset; + snapshotRegistry.revertToSnapshot(offset); + } + + @Override + public synchronized void replay( + long producerId, + short producerEpoch, + U record + ) { + coordinator.replay(producerId, producerEpoch, record); + } + + /** + * Updates the last written offset. This also create a new snapshot + * in the snapshot registry. + * + * @param offset The new last written offset. + */ + @Override + public synchronized void updateLastWrittenOffset(Long offset) { + if (offset <= lastWrittenOffset) { + throw new IllegalStateException("New last written offset " + offset + " of " + tp + + " must be greater than " + lastWrittenOffset + "."); + } + + log.debug("Update last written offset of {} to {}.", tp, offset); + lastWrittenOffset = offset; + snapshotRegistry.getOrCreateSnapshot(offset); + } + + /** + * Updates the last committed offset. This completes all the deferred + * events waiting on this offset. This also cleanups all the snapshots + * prior to this offset. + * + * @param offset The new last committed offset. + */ + @Override + public synchronized void updateLastCommittedOffset(Long offset) { + if (offset < lastCommittedOffset) { + throw new IllegalStateException("New committed offset " + offset + " of " + tp + + " must be greater than or equal to " + lastCommittedOffset + "."); + } + + lastCommittedOffset = offset; + snapshotRegistry.deleteSnapshotsUpTo(offset); + log.debug("Update committed offset of {} to {}.", tp, offset); + } + + /** + * @return The last written offset. + */ + synchronized long lastWrittenOffset() { + return this.lastWrittenOffset; + } + + /** + * @return The last committed offset. + */ + synchronized long lastCommittedOffset() { + return this.lastCommittedOffset; + } + + /** + * @return The coordinator. + */ + synchronized S coordinator() { + return this.coordinator; + } + + synchronized SnapshotRegistry snapshotRegistry() { + return this.snapshotRegistry; + } Review Comment: this is used in tests. added javadocs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org