satishd commented on code in PR #14727:
URL: https://github.com/apache/kafka/pull/14727#discussion_r1392031838
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -133,6 +134,10 @@ public class RemoteLogManager implements Closeable {
private static final Logger LOGGER =
LoggerFactory.getLogger(RemoteLogManager.class);
private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX =
"remote-log-reader";
+ private static final Set<RemoteLogSegmentState>
SEGMENT_DELETION_VALID_STATES = Collections.unmodifiableSet(EnumSet.of(
Review Comment:
COPY_SEGMENT_STARTED segments are eligible for deletion when those segments
were not able to be copied by the leader as the leader went through ungraceful
shutdown or for any oher reasons. New leader may pickup the resepctive segments
for the targeted offsets that need to be copied and the earlier failed segment
will remain in the COPY_SEGMENT_STARTED state and it will eventually be deleted
by retention cleanup logic.
So, COPY_SEGMENT_STARTED is a valid transition even now when copy and
deletion are happening sequentially.
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1027,8 +1027,32 @@ void testFindOffsetByTimestampWithInvalidEpochSegments()
throws IOException, Rem
assertEquals(Optional.empty(), maybeTimestampAndOffset3);
}
+ @Test
+ void testFindOffsetByTimestampWithSegmentNotReady() throws IOException,
RemoteStorageException {
+ TopicPartition tp = leaderTopicIdPartition.topicPartition();
+
+ long ts = time.milliseconds();
+ long startOffset = 120;
+ int targetLeaderEpoch = 10;
+
+ TreeMap<Integer, Long> validSegmentEpochs = new TreeMap<>();
+ validSegmentEpochs.put(targetLeaderEpoch, startOffset);
+
+ LeaderEpochFileCache leaderEpochFileCache = new
LeaderEpochFileCache(tp, checkpoint);
+ leaderEpochFileCache.assign(4, 99L);
+ leaderEpochFileCache.assign(5, 99L);
+ leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
+ leaderEpochFileCache.assign(12, 500L);
+
+ doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch,
validSegmentEpochs, RemoteLogSegmentState.COPY_SEGMENT_STARTED);
+
+ Optional<FileRecords.TimestampAndOffset> maybeTimestampAndOffset1 =
remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset,
leaderEpochFileCache);
Review Comment:
Can you rename `maybeTimestampAndOffset1` as `maybeTimestampAndOffset`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]