hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456076927



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
##########
@@ -16,5 +16,16 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.protocol.ApiMessage;
+
 public interface AbstractRequestResponse {
+    /**
+     * Return the auto-generated `Message` instance if this request/response 
relies on one for
+     * serialization/deserialization. If this class has not yet been updated 
to rely on the auto-generated protocol
+     * classes, return `null`.
+     * @return
+     */
+    default ApiMessage data() {

Review comment:
       Is there an advantage to pulling this up? Seems like we still need to 
update a bunch more classes. Until we have all the protocols converted, it 
might be safer to find another approach. 

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1249,26 +1249,26 @@ private CompletedFetch 
initializeCompletedFetch(CompletedFetch nextCompletedFetc
                     }
                 }
 
-                if (partition.highWatermark >= 0) {
-                    log.trace("Updating high watermark for partition {} to 
{}", tp, partition.highWatermark);
-                    subscriptions.updateHighWatermark(tp, 
partition.highWatermark);
+                if (partition.highWatermark() >= 0) {
+                    log.trace("Updating high watermark for partition {} to 
{}", tp, partition.highWatermark());
+                    subscriptions.updateHighWatermark(tp, 
partition.highWatermark());
                 }
 
-                if (partition.logStartOffset >= 0) {
-                    log.trace("Updating log start offset for partition {} to 
{}", tp, partition.logStartOffset);
-                    subscriptions.updateLogStartOffset(tp, 
partition.logStartOffset);
+                if (partition.logStartOffset() >= 0) {
+                    log.trace("Updating log start offset for partition {} to 
{}", tp, partition.logStartOffset());
+                    subscriptions.updateLogStartOffset(tp, 
partition.logStartOffset());
                 }
 
-                if (partition.lastStableOffset >= 0) {
-                    log.trace("Updating last stable offset for partition {} to 
{}", tp, partition.lastStableOffset);
-                    subscriptions.updateLastStableOffset(tp, 
partition.lastStableOffset);
+                if (partition.lastStableOffset() >= 0) {
+                    log.trace("Updating last stable offset for partition {} to 
{}", tp, partition.lastStableOffset());
+                    subscriptions.updateLastStableOffset(tp, 
partition.lastStableOffset());
                 }
 
-                if (partition.preferredReadReplica.isPresent()) {
-                    
subscriptions.updatePreferredReadReplica(completedFetch.partition, 
partition.preferredReadReplica.get(), () -> {
+                if (partition.preferredReadReplica().isPresent()) {

Review comment:
       nit: could probably change this to use `ifPresent`

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -492,74 +327,51 @@ public int maxBytes() {
     }
 
     public boolean isFromFollower() {
-        return replicaId >= 0;
+        return replicaId() >= 0;
     }
 
     public IsolationLevel isolationLevel() {
-        return isolationLevel;
+        return IsolationLevel.forId(data.isolationLevel());
     }
 
     public FetchMetadata metadata() {
         return metadata;
     }
 
     public String rackId() {
-        return rackId;
+        return data.rackId();
     }
 
     public static FetchRequest parse(ByteBuffer buffer, short version) {
-        return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), 
version);
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buffer);
+        FetchRequestData message = new FetchRequestData();
+        message.read(accessor, version);
+        return new FetchRequest(message, version);
+    }
+
+    @Override
+    public ByteBuffer serialize(RequestHeader header) {

Review comment:
       Are we overriding this so that we save the conversion to `Struct`? As 
far as I can tell, there's nothing specific to `FetchRequest` below. I wonder 
if we can move this implementation to `AbstractRequest.serialize` so that we 
save the conversion to Struct for all APIs that have been converted?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -492,74 +327,51 @@ public int maxBytes() {
     }
 
     public boolean isFromFollower() {
-        return replicaId >= 0;
+        return replicaId() >= 0;
     }
 
     public IsolationLevel isolationLevel() {
-        return isolationLevel;
+        return IsolationLevel.forId(data.isolationLevel());
     }
 
     public FetchMetadata metadata() {
         return metadata;
     }
 
     public String rackId() {
-        return rackId;
+        return data.rackId();
     }
 
     public static FetchRequest parse(ByteBuffer buffer, short version) {
-        return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), 
version);
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buffer);

Review comment:
       In the parsing logic, we still convert to struct first before calling 
`AbstractRequest.parseRequest`. I think we could bypass the `Struct` conversion 
by changing `AbstractRequest.parseRequest` to take the `ByteBuffer` instead of 
the `Struct`. 
   ```java
       public static AbstractRequest parseRequest(ApiKeys apiKey, short 
apiVersion, ByteBuffer buffer) {
   ```
   Then in the fetch case, we could just call this method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to