[ https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17529173#comment-17529173 ]
RivenSun commented on KAFKA-13857: ---------------------------------- [~guozhang] When I count the current number of messages for each partition of the topic, I would expect to return LEO and logStartOffset. Because when the leader write rate is much greater than the replica synchronization rate, i.e. `ISR == 1`, the `LEO - logStartOffset` value is an accurate value of the total number of messages. Finally when `ISR == AR`, `LEO - logStartOffset` == `HW- logStartOffset` If this feature will not be supported in the future, please don't mind that you can close this JIRA at any time. > The listOffsets method of KafkaAdminClient should support returning > logEndOffset of topicPartition > -------------------------------------------------------------------------------------------------- > > Key: KAFKA-13857 > URL: https://issues.apache.org/jira/browse/KAFKA-13857 > Project: Kafka > Issue Type: Improvement > Components: admin > Reporter: RivenSun > Priority: Major > > The server side currently handles the LIST_OFFSETS request process as follows: > {code:java} > KafkaApis.handleListOffsetRequest() -> > KafkaApis.handleListOffsetRequestV1AndAbove() -> > ReplicaManager.fetchOffsetForTimestamp() -> > Partition.fetchOffsetForTimestamp(){code} > > In the last method above, it is obvious that when the client side does not > pass the isolationLevel value, the server side supports returning > localLog.logEndOffset. > {code:java} > val lastFetchableOffset = isolationLevel match { > case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset > case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark > case None => localLog.logEndOffset > } > {code} > > > KafkaAdminClient is an operation and maintenance management tool, which > *should be different from the listOffsets-related methods (offsetsForTimes, > beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not > be limited by the value of {color:#ff0000}isolationLevel {color}in the > ListOffsetsOptions parameter.* > In the current KafkaAdminClient.listOffsets() method, both the AdminClient > and the server consider isolationLevel as a required parameter: > 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException > will be thrown when AdminClient executes listOffsets() method. > {code:java} > ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code} > 2) The current logic for converting isolationLevel on the server side has not > yet handled the case where the user passes in a value that is neither > READ_UNCOMMITTED nor READ_COMMITTED : > {code:java} > val isolationLevelOpt = if (isClientRequest) > Some(offsetRequest.isolationLevel) > else > None {code} > {code:java} > public IsolationLevel isolationLevel() { > return IsolationLevel.forId(data.isolationLevel()); > } {code} > h1. > h2. Suggestion: > Added a new enum `NONE` in IsolationLevel, only dedicated to > AdminClient.listOffsets() method. > This change may cause the highestSupportedVersion of > ApiMessageType.LIST_OFFSETS to increase by one. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)