[ 
https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17529173#comment-17529173
 ] 

RivenSun commented on KAFKA-13857:
----------------------------------

[~guozhang] 

When I count the current number of messages for each partition of the topic, I 
would expect to return LEO and logStartOffset.
Because when the leader write rate is much greater than the replica 
synchronization rate, i.e. `ISR == 1`, the `LEO - logStartOffset` value is an 
accurate value of the total number of messages.
Finally when `ISR == AR`,  `LEO - logStartOffset` == `HW- logStartOffset`

If this feature will not be supported in the future, please don't mind that you 
can close this JIRA at any time.

> The listOffsets method of KafkaAdminClient should support returning 
> logEndOffset of topicPartition
> --------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13857
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13857
>             Project: Kafka
>          Issue Type: Improvement
>          Components: admin
>            Reporter: RivenSun
>            Priority: Major
>
> The server side currently handles the LIST_OFFSETS request process as follows:
> {code:java}
> KafkaApis.handleListOffsetRequest() ->
> KafkaApis.handleListOffsetRequestV1AndAbove() ->
> ReplicaManager.fetchOffsetForTimestamp() ->
> Partition.fetchOffsetForTimestamp(){code}
>  
> In the last method above, it is obvious that when the client side does not 
> pass the isolationLevel value, the server side supports returning 
> localLog.logEndOffset.
> {code:java}
> val lastFetchableOffset = isolationLevel match {
>   case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
>   case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
>   case None => localLog.logEndOffset
> } 
> {code}
>  
>  
> KafkaAdminClient is an operation and maintenance management tool, which 
> *should be different from the listOffsets-related methods (offsetsForTimes, 
> beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not 
> be limited by the value of {color:#ff0000}isolationLevel {color}in the 
> ListOffsetsOptions parameter.*
> In the current KafkaAdminClient.listOffsets() method, both the AdminClient 
> and the server consider isolationLevel as a required parameter:
> 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException 
> will be thrown when AdminClient executes listOffsets() method.
> {code:java}
> ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}
> 2) The current logic for converting isolationLevel on the server side has not 
> yet handled the case where the user passes in a value that is neither 
> READ_UNCOMMITTED nor READ_COMMITTED :
> {code:java}
> val isolationLevelOpt = if (isClientRequest)
>   Some(offsetRequest.isolationLevel)
> else
>   None {code}
> {code:java}
> public IsolationLevel isolationLevel() {
>     return IsolationLevel.forId(data.isolationLevel());
> } {code}
> h1.  
> h2. Suggestion:
> Added a new enum `NONE` in IsolationLevel, only dedicated to 
> AdminClient.listOffsets() method.
> This change may cause the highestSupportedVersion of 
> ApiMessageType.LIST_OFFSETS to increase by one.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to