hachikuji commented on a change in pull request #9008: URL: https://github.com/apache/kafka/pull/9008#discussion_r456076927
########## File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java ########## @@ -16,5 +16,16 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.protocol.ApiMessage; + public interface AbstractRequestResponse { + /** + * Return the auto-generated `Message` instance if this request/response relies on one for + * serialization/deserialization. If this class has not yet been updated to rely on the auto-generated protocol + * classes, return `null`. + * @return + */ + default ApiMessage data() { Review comment: Is there an advantage to pulling this up? Seems like we still need to update a bunch more classes. Until we have all the protocols converted, it might be safer to find another approach. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -1249,26 +1249,26 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc } } - if (partition.highWatermark >= 0) { - log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark); - subscriptions.updateHighWatermark(tp, partition.highWatermark); + if (partition.highWatermark() >= 0) { + log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark()); + subscriptions.updateHighWatermark(tp, partition.highWatermark()); } - if (partition.logStartOffset >= 0) { - log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset); - subscriptions.updateLogStartOffset(tp, partition.logStartOffset); + if (partition.logStartOffset() >= 0) { + log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset()); + subscriptions.updateLogStartOffset(tp, partition.logStartOffset()); } - if (partition.lastStableOffset >= 0) { - log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset); - subscriptions.updateLastStableOffset(tp, partition.lastStableOffset); + if (partition.lastStableOffset() >= 0) { + log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset()); + subscriptions.updateLastStableOffset(tp, partition.lastStableOffset()); } - if (partition.preferredReadReplica.isPresent()) { - subscriptions.updatePreferredReadReplica(completedFetch.partition, partition.preferredReadReplica.get(), () -> { + if (partition.preferredReadReplica().isPresent()) { Review comment: nit: could probably change this to use `ifPresent` ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -492,74 +327,51 @@ public int maxBytes() { } public boolean isFromFollower() { - return replicaId >= 0; + return replicaId() >= 0; } public IsolationLevel isolationLevel() { - return isolationLevel; + return IsolationLevel.forId(data.isolationLevel()); } public FetchMetadata metadata() { return metadata; } public String rackId() { - return rackId; + return data.rackId(); } public static FetchRequest parse(ByteBuffer buffer, short version) { - return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version); + ByteBufferAccessor accessor = new ByteBufferAccessor(buffer); + FetchRequestData message = new FetchRequestData(); + message.read(accessor, version); + return new FetchRequest(message, version); + } + + @Override + public ByteBuffer serialize(RequestHeader header) { Review comment: Are we overriding this so that we save the conversion to `Struct`? As far as I can tell, there's nothing specific to `FetchRequest` below. I wonder if we can move this implementation to `AbstractRequest.serialize` so that we save the conversion to Struct for all APIs that have been converted? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -492,74 +327,51 @@ public int maxBytes() { } public boolean isFromFollower() { - return replicaId >= 0; + return replicaId() >= 0; } public IsolationLevel isolationLevel() { - return isolationLevel; + return IsolationLevel.forId(data.isolationLevel()); } public FetchMetadata metadata() { return metadata; } public String rackId() { - return rackId; + return data.rackId(); } public static FetchRequest parse(ByteBuffer buffer, short version) { - return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version); + ByteBufferAccessor accessor = new ByteBufferAccessor(buffer); Review comment: In the parsing logic, we still convert to struct first before calling `AbstractRequest.parseRequest`. I think we could bypass the `Struct` conversion by changing `AbstractRequest.parseRequest` to take the `ByteBuffer` instead of the `Struct`. ```java public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, ByteBuffer buffer) { ``` Then in the fetch case, we could just call this method. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org