[ https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17531003#comment-17531003 ]
RivenSun commented on KAFKA-13857: ---------------------------------- [~guozhang] Thank you for your reply and approval. I don't have a strong need to query LEO right now. This feature may can be supported in future releases. I may be busy with other things recently, and anyone interested in this can assign this JIRA to themselves. Thanks. > The listOffsets method of KafkaAdminClient should support returning > logEndOffset of topicPartition > -------------------------------------------------------------------------------------------------- > > Key: KAFKA-13857 > URL: https://issues.apache.org/jira/browse/KAFKA-13857 > Project: Kafka > Issue Type: Improvement > Components: admin > Reporter: RivenSun > Priority: Major > > The server side currently handles the LIST_OFFSETS request process as follows: > {code:java} > KafkaApis.handleListOffsetRequest() -> > KafkaApis.handleListOffsetRequestV1AndAbove() -> > ReplicaManager.fetchOffsetForTimestamp() -> > Partition.fetchOffsetForTimestamp(){code} > > In the last method above, it is obvious that when the client side does not > pass the isolationLevel value, the server side supports returning > localLog.logEndOffset. > {code:java} > val lastFetchableOffset = isolationLevel match { > case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset > case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark > case None => localLog.logEndOffset > } > {code} > > > KafkaAdminClient is an operation and maintenance management tool, which > *should be different from the listOffsets-related methods (offsetsForTimes, > beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not > be limited by the value of {color:#ff0000}isolationLevel {color}in the > ListOffsetsOptions parameter.* > In the current KafkaAdminClient.listOffsets() method, both the AdminClient > and the server consider isolationLevel as a required parameter: > 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException > will be thrown when AdminClient executes listOffsets() method. > {code:java} > ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code} > 2) The current logic for converting isolationLevel on the server side has not > yet handled the case where the user passes in a value that is neither > READ_UNCOMMITTED nor READ_COMMITTED : > {code:java} > val isolationLevelOpt = if (isClientRequest) > Some(offsetRequest.isolationLevel) > else > None {code} > {code:java} > public IsolationLevel isolationLevel() { > return IsolationLevel.forId(data.isolationLevel()); > } {code} > h1. > h2. Suggestion: > Added a new enum `NONE` in IsolationLevel, only dedicated to > AdminClient.listOffsets() method. > This change may cause the highestSupportedVersion of > ApiMessageType.LIST_OFFSETS to increase by one. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)