[ https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17530569#comment-17530569 ]
Guozhang Wang commented on KAFKA-13857: --------------------------------------- Hi [~RivenSun], I think it may be valuable to allow admin clients to get LEOs. Since this involves changing the brokers API server with a bump on the list offset protocol like you said, I'd just want to know if there's an existing urgent use case that can support the KIP proposing it. If you feel strong about adding the feature now than in the future, please do not hesitant to propose a KIP asking other's opinions. > The listOffsets method of KafkaAdminClient should support returning > logEndOffset of topicPartition > -------------------------------------------------------------------------------------------------- > > Key: KAFKA-13857 > URL: https://issues.apache.org/jira/browse/KAFKA-13857 > Project: Kafka > Issue Type: Improvement > Components: admin > Reporter: RivenSun > Priority: Major > > The server side currently handles the LIST_OFFSETS request process as follows: > {code:java} > KafkaApis.handleListOffsetRequest() -> > KafkaApis.handleListOffsetRequestV1AndAbove() -> > ReplicaManager.fetchOffsetForTimestamp() -> > Partition.fetchOffsetForTimestamp(){code} > > In the last method above, it is obvious that when the client side does not > pass the isolationLevel value, the server side supports returning > localLog.logEndOffset. > {code:java} > val lastFetchableOffset = isolationLevel match { > case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset > case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark > case None => localLog.logEndOffset > } > {code} > > > KafkaAdminClient is an operation and maintenance management tool, which > *should be different from the listOffsets-related methods (offsetsForTimes, > beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not > be limited by the value of {color:#ff0000}isolationLevel {color}in the > ListOffsetsOptions parameter.* > In the current KafkaAdminClient.listOffsets() method, both the AdminClient > and the server consider isolationLevel as a required parameter: > 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException > will be thrown when AdminClient executes listOffsets() method. > {code:java} > ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code} > 2) The current logic for converting isolationLevel on the server side has not > yet handled the case where the user passes in a value that is neither > READ_UNCOMMITTED nor READ_COMMITTED : > {code:java} > val isolationLevelOpt = if (isClientRequest) > Some(offsetRequest.isolationLevel) > else > None {code} > {code:java} > public IsolationLevel isolationLevel() { > return IsolationLevel.forId(data.isolationLevel()); > } {code} > h1. > h2. Suggestion: > Added a new enum `NONE` in IsolationLevel, only dedicated to > AdminClient.listOffsets() method. > This change may cause the highestSupportedVersion of > ApiMessageType.LIST_OFFSETS to increase by one. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)