jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572363402



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -270,24 +314,40 @@ public T records() {
      */
     public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> 
responseData,
+                         List<IdError> idErrors,
+                         Map<String, Uuid> topicIds,
                          int throttleTimeMs,
                          int sessionId) {
         super(ApiKeys.FETCH);
-        this.data = toMessage(throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+        this.data = toMessage(throttleTimeMs, error, 
responseData.entrySet().iterator(), idErrors, topicIds, sessionId);
         this.responseDataMap = responseData;
     }
 
     public FetchResponse(FetchResponseData fetchResponseData) {
         super(ApiKeys.FETCH);
         this.data = fetchResponseData;
-        this.responseDataMap = toResponseDataMap(fetchResponseData);
+        if (!supportsTopicIds()) {
+            this.responseDataMap = toResponseDataMap(fetchResponseData);
+        } else {
+            this.responseDataMap = null;
+        }
     }
 
     public Errors error() {
         return Errors.forCode(data.errorCode());
     }
 
-    public LinkedHashMap<TopicPartition, PartitionData<T>> responseData() {
+    public LinkedHashMap<TopicPartition, PartitionData<T>> 
responseData(Map<Uuid, String> topicNames) {
+        if (!supportsTopicIds())
+            return responseDataMap;
+        return toResponseDataMap(data, topicNames);
+
+    }
+
+    // Used when we can guarantee responseData is populated with all possible 
partitions
+    // This occurs when we have a response version < 13 or we built the 
FetchResponse with
+    // responseDataMap as a parameter and we have the same topic IDs available.
+    public LinkedHashMap<TopicPartition, PartitionData<T>> 
resolvedResponseData() {

Review comment:
       I agree. I think it stems from exactly what you said...that  
`FetchContext.updateAndGenerateResponseData()` generates a response only for it 
to be generated again. Currently ` 
FetchContext.updateAndGenerateResponseData()` does include all partitions 
(resolved and unresolved). The issue is that the partitions need to be 
down-converted. The way this works is that the partitions are pulled from the 
FetchResponse object itself. However, the issue is that I've changed 
responseData and since this is the newest version of the response, it will try 
to reconstruct the map instead of pulling the object `partitionData` I thought 
about changing the method to always return the map when it is not null, but 
that caused some issues in some places as well. I can look into this again 
though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to