satishd commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1590570923
########## core/src/main/java/kafka/server/TierStateMachine.java: ########## @@ -40,19 +90,176 @@ public interface TierStateMachine { */ PartitionFetchState start(TopicPartition topicPartition, PartitionFetchState currentFetchState, - PartitionData fetchPartitionData) throws Exception; + PartitionData fetchPartitionData) throws Exception { + OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); + int epoch = epochAndLeaderLocalStartOffset.leaderEpoch(); + long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset(); + + long offsetToFetch; + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark(); + + UnifiedLog unifiedLog; + if (useFutureLog) { + unifiedLog = replicaMgr.futureLogOrException(topicPartition); + } else { + unifiedLog = replicaMgr.localLogOrException(topicPartition); + } + + try { + offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset(), unifiedLog); + } catch (RemoteStorageException e) { + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark(); + throw e; + } + + OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch()); + long leaderEndOffset = fetchLatestOffsetResult.offset(); + + long initialLag = leaderEndOffset - offsetToFetch; + + return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(), + Fetching$.MODULE$, unifiedLog.latestEpoch()); + + } + + private OffsetForLeaderEpochResponseData.EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch, + TopicPartition partition, + Integer currentLeaderEpoch) { + int previousEpoch = epoch - 1; + + // Find the end-offset for the epoch earlier to the given epoch from the leader + Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>(); + partitionsWithEpochs.put(partition, new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch)); + Option<OffsetForLeaderEpochResponseData.EpochEndOffset> maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition); + if (maybeEpochEndOffset.isEmpty()) { + throw new KafkaException("No response received for partition: " + partition); + } + + OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = maybeEpochEndOffset.get(); + if (epochEndOffset.errorCode() != Errors.NONE.code()) { + throw Errors.forCode(epochEndOffset.errorCode()).exception(); + } + + return epochEndOffset; + } + + private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm, + RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException { + InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH); + try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { + CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER); + return readBuffer.read(); + } + } + + private void buildProducerSnapshotFile(UnifiedLog unifiedLog, + long nextOffset, + RemoteLogSegmentMetadata remoteLogSegmentMetadata, + RemoteLogManager rlm) throws IOException, RemoteStorageException { + // Restore producer snapshot + File snapshotFile = LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset); + Path tmpSnapshotFile = Paths.get(snapshotFile.getAbsolutePath() + ".tmp"); + // Copy it to snapshot file in atomic manner. + Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT), + tmpSnapshotFile, StandardCopyOption.REPLACE_EXISTING); + Utils.atomicMoveWithFallback(tmpSnapshotFile, snapshotFile.toPath(), false); + + // Reload producer snapshots. + unifiedLog.producerStateManager().truncateFullyAndReloadSnapshots(); + unifiedLog.loadProducerState(nextOffset); + } /** - * Optionally advance the state of the tier state machine, based on the - * current PartitionFetchState. The decision to advance the tier - * state machine is implementation specific. - * - * @param topicPartition the topic partition - * @param currentFetchState the current PartitionFetchState which will - * be used to derive the return value - * - * @return the new PartitionFetchState if the tier state machine was advanced, otherwise, return the currentFetchState + * It tries to build the required state for this partition from leader and remote storage so that it can start + * fetching records from the leader. The return value is the next offset to fetch from the leader, which is the + * next offset following the end offset of the remote log portion. */ - Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition, Review Comment: We can remove this function for now. We will introduce it when https://issues.apache.org/jira/browse/KAFKA-13560 is addressed. I will leave a comment in KAFKA-13560 once this PR is merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org