hachikuji commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r565003308
##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -131,6 +194,19 @@ class KafkaMetadataLog(
}
}
+ override def highWatermark: LogOffsetMetadata = {
+ val LogOffsetSnapshot(_, _, hwm, _) = log.fetchOffsetSnapshot
+ val segmentPosition: Optional[OffsetMetadata] = if (hwm.segmentBaseOffset
!= Log.UnknownOffset &&
Review comment:
Maybe use `hwm.messageOffsetOnly`?
##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -113,6 +160,22 @@ class KafkaMetadataLog(
log.truncateTo(offset)
}
+ override def maybeTruncateFullyToLatestSnapshot(): Boolean = {
Review comment:
Having second thoughts about the name here (I know I suggested the
`maybe`). Since there is a returned value, perhaps it is clear enough already
and we can try a more concise name. How about just `truncateToLatestSnapshot`?
##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -983,48 +999,88 @@ private FetchResponseData tryCompleteFetchRequest(
FetchRequestData.FetchPartition request,
long currentTimeMs
) {
- Optional<Errors> errorOpt =
validateLeaderOnlyRequest(request.currentLeaderEpoch());
- if (errorOpt.isPresent()) {
- return buildEmptyFetchResponse(errorOpt.get(), Optional.empty());
- }
+ try {
+ Optional<Errors> errorOpt =
validateLeaderOnlyRequest(request.currentLeaderEpoch());
+ if (errorOpt.isPresent()) {
+ return buildEmptyFetchResponse(errorOpt.get(),
Optional.empty());
+ }
- long fetchOffset = request.fetchOffset();
- int lastFetchedEpoch = request.lastFetchedEpoch();
- LeaderState state = quorum.leaderStateOrThrow();
- Optional<OffsetAndEpoch> divergingEpochOpt =
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
-
- if (divergingEpochOpt.isPresent()) {
- Optional<FetchResponseData.EpochEndOffset> divergingEpoch =
- divergingEpochOpt.map(offsetAndEpoch -> new
FetchResponseData.EpochEndOffset()
- .setEpoch(offsetAndEpoch.epoch)
- .setEndOffset(offsetAndEpoch.offset));
- return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY,
divergingEpoch, state.highWatermark());
- } else {
- LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
+ long fetchOffset = request.fetchOffset();
+ int lastFetchedEpoch = request.lastFetchedEpoch();
+ LeaderState state = quorum.leaderStateOrThrow();
+ ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch =
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
+
+ final Records records;
+ if (validatedOffsetAndEpoch.type() ==
ValidatedFetchOffsetAndEpoch.Type.VALID) {
+ LogFetchInfo info = log.read(fetchOffset,
Isolation.UNCOMMITTED);
- if (state.updateReplicaState(replicaId, currentTimeMs,
info.startOffsetMetadata)) {
- onUpdateLeaderHighWatermark(state, currentTimeMs);
+ if (state.updateReplicaState(replicaId, currentTimeMs,
info.startOffsetMetadata)) {
+ onUpdateLeaderHighWatermark(state, currentTimeMs);
+ }
+
+ records = info.records;
+ } else {
+ records = MemoryRecords.EMPTY;
}
- return buildFetchResponse(Errors.NONE, info.records,
Optional.empty(), state.highWatermark());
+ return buildFetchResponse(Errors.NONE, records,
validatedOffsetAndEpoch, state.highWatermark());
+ } catch (Exception e) {
+ logger.error("Caught unexpected error in fetch completion of
request {}", request, e);
+ return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR,
Optional.empty());
}
}
/**
* Check whether a fetch offset and epoch is valid. Return the diverging
epoch, which
* is the largest epoch such that subsequent records are known to diverge.
*/
- private Optional<OffsetAndEpoch> validateFetchOffsetAndEpoch(long
fetchOffset, int lastFetchedEpoch) {
- if (fetchOffset == 0 && lastFetchedEpoch == 0) {
- return Optional.empty();
+ private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long
fetchOffset, int lastFetchedEpoch) {
+ if (log.startOffset() == 0 && fetchOffset == 0) {
+ if (lastFetchedEpoch != 0) {
+ logger.warn(
+ "Replica sent a zero fetch offset ({}) but the last
fetched epoch ({}) was not zero",
+ fetchOffset,
+ lastFetchedEpoch
+ );
+ }
+ return ValidatedFetchOffsetAndEpoch.valid(new
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));
}
- OffsetAndEpoch endOffsetAndEpoch =
log.endOffsetForEpoch(lastFetchedEpoch)
- .orElse(new OffsetAndEpoch(-1L, -1));
- if (endOffsetAndEpoch.epoch != lastFetchedEpoch ||
endOffsetAndEpoch.offset < fetchOffset) {
- return Optional.of(endOffsetAndEpoch);
+
+ OffsetAndEpoch endOffsetAndEpoch =
log.endOffsetForEpoch(lastFetchedEpoch).orElseThrow(() -> {
+ return new IllegalStateException(
+ String.format(
+ "Expected to find an end offset for epoch %s since it must
be less than the current epoch %s",
+ lastFetchedEpoch,
+ quorum.epoch()
+ )
+ );
+ });
+
+ if (log.oldestSnapshotId().isPresent() &&
+ ((fetchOffset < log.startOffset()) ||
+ (fetchOffset == log.startOffset() && lastFetchedEpoch !=
log.oldestSnapshotId().get().epoch) ||
+ (lastFetchedEpoch < log.oldestSnapshotId().get().epoch))) {
Review comment:
Related to my other comment, but I think this check can go away if we
change `endOffsetForEpoch` to only return the end offset and epoch when they
can be definitively determined.
The other condition where we need to return a snapshot is when the returned
epoch `endOffsetAndEpoch.epoch` equals the oldest snapshot epoch and the fetch
offset is less than the oldest snapshot offset, which I think is equivalent to
what is here. So I guess I'm thinking we can write this like this:
```
OffsetAndEpoch endOffsetAndEpoch = log.endOffsetForEpoch(lastFetchedEpoch)
if (!endOffsetAndEpoch.isPresent) {
// return latest snapshot
}
OffsetAndEpoch oldestSnapshotId = log.oldestSnapshotId().getOrElse(new
OffsetAndEpoch(0, 0));
if (endOffsetAndEpoch.epoch == oldestSnapshotId.epoch && fetchOffset <
oldestSnapshotId.offset) {
// return latest snapshot
else if (endOffsetAndEpoch.epoch != lastFetchedEpoch ||
endOffsetAndEpoch.offset < fetchOffset) {
// return diverging epoch
} else {
// valid fetch
}
```
##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -147,18 +223,102 @@ class KafkaMetadataLog(
}
override def createSnapshot(snapshotId: raft.OffsetAndEpoch):
RawSnapshotWriter = {
- FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+ // Do not let the state machine create snapshots older than the latest
snapshot
+ latestSnapshotId().ifPresent { latest =>
+ if (latest.epoch > snapshotId.epoch || latest.offset >
snapshotId.offset) {
+ // Since snapshots are less than the high-watermark absolute offset
comparison is okay.
+ throw new IllegalArgumentException(
+ s"Attemting to create a snapshot ($snapshotId) that is not greater
than the latest snapshot ($latest)"
+ )
+ }
+ }
+
+ FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
}
override def readSnapshot(snapshotId: raft.OffsetAndEpoch):
Optional[RawSnapshotReader] = {
try {
- Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+ if (snapshotIds.contains(snapshotId)) {
+ Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+ } else {
+ Optional.empty()
+ }
+ } catch {
+ case _: NoSuchFileException =>
+ Optional.empty()
+ }
+ }
+
+ override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+ try {
+ Optional.of(snapshotIds.last)
} catch {
- case e: NoSuchFileException => Optional.empty()
+ case _: NoSuchElementException =>
+ Optional.empty()
+ }
+ }
+
+ override def oldestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+ oldestSnapshotId
+ }
+
+ override def onSnapshotFrozen(snapshotId: raft.OffsetAndEpoch): Unit = {
+ snapshotIds.add(snapshotId)
+ }
+
+ override def deleteToNewOldestSnapshotId(logStartSnapshotId:
raft.OffsetAndEpoch): Boolean = {
Review comment:
nit: the name is a tad awkward. How about `deleteUpToSnapshot` or
`deleteBeforeSnapshot`?
##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -983,48 +999,88 @@ private FetchResponseData tryCompleteFetchRequest(
FetchRequestData.FetchPartition request,
long currentTimeMs
) {
- Optional<Errors> errorOpt =
validateLeaderOnlyRequest(request.currentLeaderEpoch());
- if (errorOpt.isPresent()) {
- return buildEmptyFetchResponse(errorOpt.get(), Optional.empty());
- }
+ try {
+ Optional<Errors> errorOpt =
validateLeaderOnlyRequest(request.currentLeaderEpoch());
+ if (errorOpt.isPresent()) {
+ return buildEmptyFetchResponse(errorOpt.get(),
Optional.empty());
+ }
- long fetchOffset = request.fetchOffset();
- int lastFetchedEpoch = request.lastFetchedEpoch();
- LeaderState state = quorum.leaderStateOrThrow();
- Optional<OffsetAndEpoch> divergingEpochOpt =
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
-
- if (divergingEpochOpt.isPresent()) {
- Optional<FetchResponseData.EpochEndOffset> divergingEpoch =
- divergingEpochOpt.map(offsetAndEpoch -> new
FetchResponseData.EpochEndOffset()
- .setEpoch(offsetAndEpoch.epoch)
- .setEndOffset(offsetAndEpoch.offset));
- return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY,
divergingEpoch, state.highWatermark());
- } else {
- LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
+ long fetchOffset = request.fetchOffset();
+ int lastFetchedEpoch = request.lastFetchedEpoch();
+ LeaderState state = quorum.leaderStateOrThrow();
+ ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch =
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
+
+ final Records records;
+ if (validatedOffsetAndEpoch.type() ==
ValidatedFetchOffsetAndEpoch.Type.VALID) {
+ LogFetchInfo info = log.read(fetchOffset,
Isolation.UNCOMMITTED);
- if (state.updateReplicaState(replicaId, currentTimeMs,
info.startOffsetMetadata)) {
- onUpdateLeaderHighWatermark(state, currentTimeMs);
+ if (state.updateReplicaState(replicaId, currentTimeMs,
info.startOffsetMetadata)) {
+ onUpdateLeaderHighWatermark(state, currentTimeMs);
+ }
+
+ records = info.records;
+ } else {
+ records = MemoryRecords.EMPTY;
}
- return buildFetchResponse(Errors.NONE, info.records,
Optional.empty(), state.highWatermark());
+ return buildFetchResponse(Errors.NONE, records,
validatedOffsetAndEpoch, state.highWatermark());
+ } catch (Exception e) {
+ logger.error("Caught unexpected error in fetch completion of
request {}", request, e);
+ return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR,
Optional.empty());
}
}
/**
* Check whether a fetch offset and epoch is valid. Return the diverging
epoch, which
* is the largest epoch such that subsequent records are known to diverge.
*/
- private Optional<OffsetAndEpoch> validateFetchOffsetAndEpoch(long
fetchOffset, int lastFetchedEpoch) {
- if (fetchOffset == 0 && lastFetchedEpoch == 0) {
- return Optional.empty();
+ private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long
fetchOffset, int lastFetchedEpoch) {
+ if (log.startOffset() == 0 && fetchOffset == 0) {
+ if (lastFetchedEpoch != 0) {
+ logger.warn(
+ "Replica sent a zero fetch offset ({}) but the last
fetched epoch ({}) was not zero",
+ fetchOffset,
+ lastFetchedEpoch
+ );
+ }
+ return ValidatedFetchOffsetAndEpoch.valid(new
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));
Review comment:
nit: should we use 0 for `fetchOffset` and `lastFetchedEpoch` here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]