chia7712 commented on code in PR #16873:
URL: https://github.com/apache/kafka/pull/16873#discussion_r1728211668
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1103,35 +1103,41 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseTopics = authorizedRequestInfo.map { topic =>
val responsePartitions = topic.partitions.asScala.map { partition =>
- val topicPartition = new TopicPartition(topic.name,
partition.partitionIndex)
-
- try {
- val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
- topicPartition = topicPartition,
- timestamp = partition.timestamp,
- maxNumOffsets = partition.maxNumOffsets,
- isFromConsumer = offsetRequest.replicaId ==
ListOffsetsRequest.CONSUMER_REPLICA_ID,
- fetchOnlyFromLeader = offsetRequest.replicaId !=
ListOffsetsRequest.DEBUGGING_REPLICA_ID)
+ if (partition.timestamp() < ListOffsetsRequest.EARLIEST_TIMESTAMP) {
new ListOffsetsPartitionResponse()
.setPartitionIndex(partition.partitionIndex)
- .setErrorCode(Errors.NONE.code)
- .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
- } catch {
- // NOTE: UnknownTopicOrPartitionException and
NotLeaderOrFollowerException are special cases since these error messages
- // are typically transient and there is no value in logging the
entire stack trace for the same
- case e @ (_ : UnknownTopicOrPartitionException |
- _ : NotLeaderOrFollowerException |
- _ : KafkaStorageException) =>
- debug("Offset request with correlation id %d from client %s on
partition %s failed due to %s".format(
- correlationId, clientId, topicPartition, e.getMessage))
- new ListOffsetsPartitionResponse()
- .setPartitionIndex(partition.partitionIndex)
- .setErrorCode(Errors.forException(e).code)
- case e: Throwable =>
- error("Error while responding to offset request", e)
+ .setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+ } else {
+ val topicPartition = new TopicPartition(topic.name,
partition.partitionIndex)
+
+ try {
+ val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
Review Comment:
> For 3.9, I was thinking that we could change createRemoteLogManager() in
BrokerServer to take the MV into consideration. If the MV is too low, we could
just return None even if remote storage has been enabled. This is a bit tricky
since MV is only available after the following statement.
That is indeed a good protection, but as we discussed in another PR - the MV
is dynamic so we still need to check the version of request sent by tiered
storage in runtime, right?
I guess the total solution (make tiered storage work with dynamic MV) is
"feature version", but the "acceptable" fixes we can address for now are:
1. disable tiered storage when MV is <= 3.5 in startup. This CAN protect
users from using tiered storage with incorrect MV in the beginning, but it CAN
NOT work with dynamic MV. i.e tiered storage CAN NOT be re-enabeld/re-disabled
when MV gets upgrade/downgrade
2. produce error when tiered storage sends request with incorrect MV. This
CAN make "noise" to tiered storage when MV gets upgrade/downgrade, but the
error CAN NOT be there in time. That means the error happens only if tiered
storage use the request.
> this makes sense, but want to confirm -- has this been an issue since
tiered storage was released?
The issue we discuss happens on LIST_OFFSET request, and it is used to deal
with `OFFSET_MOVED_TO_TIERED_STORAGE`. The error is produced after 3.6
(https://github.com/apache/kafka/commit/6f197301646135e0bb39a461ca0a07c09c3185fb#diff-78812e247ffeae6f8c49b1b22506434701b1e1bafe7f92ef8f8708059e292bf0).
That means the leader (broker) which can return the
`OFFSET_MOVED_TO_TIERED_STORAGE` is able to handle `EARLIEST_LOCAL_TIMESTAMP`
too.
So my first comment may be no correct: ` those older brokers can't ever
handle EARLIEST_LOCAL_TIMESTAMP correctly as they have no idea of
EARLIEST_LOCAL_TIMESTAMP`.
In short, we can let it go if all we want (know) to fix is the LIST_OFFSET
request. Or we can add some protection (as above).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]