apoorvmittal10 commented on code in PR #22479:
URL: https://github.com/apache/kafka/pull/22479#discussion_r3361829394


##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -777,7 +777,9 @@ class BrokerServer(
           new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key => 
shareCoordinator.partitionFor(key), config.interBrokerListenerName, 
groupConfigManager),
           Time.SYSTEM,
           shareGroupTimer,
-          shareGroupMetrics
+          shareGroupMetrics,
+          config.getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG),
+          new ReplicaManagerLogReader(replicaManager)

Review Comment:
   There should already be one ReplicaManagerLogReader created for 
SharePartitionManager, why not to re-use?



##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -777,7 +777,9 @@ class BrokerServer(
           new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key => 
shareCoordinator.partitionFor(key), config.interBrokerListenerName, 
groupConfigManager),
           Time.SYSTEM,
           shareGroupTimer,
-          shareGroupMetrics
+          shareGroupMetrics,
+          config.getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG),

Review Comment:
   Are the reads going to be for multiple offsets or 1 at a time? I am 
expecting a specific offset data will be fetched, correct? If that's the case 
then do we need to 55MB as max bytes config?



##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -345,10 +366,20 @@ public void populateDLQTopicData() throws ConfigException 
{
         }
 
         public ProduceRequestData.TopicProduceData topicProduceData() {
+            maybeFetchRecordData();
+
             List<SimpleRecord> simpleRecords = new ArrayList<>();
             for (long i = param.firstOffset(); i <= param.lastOffset(); i++) {
                 long timestamp = time.hiResClockMs();
-                simpleRecords.add(new SimpleRecord(timestamp, (byte[]) null, 
null, headers(i)));
+                int recordIndex = (int) (i - param.firstOffset());
+                ByteBuffer key = null;
+                ByteBuffer value = null;
+                if (originalRecordData.size() > recordIndex) {

Review Comment:
   What's the significance of this check, isn't you expect that if 
originalRecordData is not empty then all offsets data should be present? So if 
(!originalRecordData.isEmpty()) check should be sufficient?
   
   Also where do we validate that originalRecordData strictly has all offset 
data which is requested?



##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -231,6 +251,7 @@ public ProduceRequestHandler(
         ) {
             this.param = param;
             this.result = result;
+            this.originalRecordData = new ArrayList<>();

Review Comment:
   Shouldn't we lazily initialize the arraylist when original record data 
capture is enabled?



##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -653,6 +684,69 @@ private void handleProduceResponse(ClientResponse 
response) {
                     requestErrorResponse(clientResponseError.exception());
             }
         }
+
+        private void maybeFetchRecordData() {
+            if (cacheHelper.isShareGroupDlqCopyRecordEnabled(param.groupId())) 
{
+                if (!originalRecordData.isEmpty()) {
+                    return;
+                }

Review Comment:
   When this can happen, a comment in code will be helpful?



##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -653,6 +684,69 @@ private void handleProduceResponse(ClientResponse 
response) {
                     requestErrorResponse(clientResponseError.exception());
             }
         }
+
+        private void maybeFetchRecordData() {
+            if (cacheHelper.isShareGroupDlqCopyRecordEnabled(param.groupId())) 
{
+                if (!originalRecordData.isEmpty()) {
+                    return;
+                }
+
+                long startTime = time.hiResClockMs();
+                TopicIdPartition tp = param.topicIdPartition();
+
+                FetchParams fetchParams = new FetchParams(
+                    FetchRequest.CONSUMER_REPLICA_ID,           // -1, reading 
as a consumer
+                    -1,                                         // replicaEpoch
+                    0L,                                         // maxWaitMs - 
don't block
+                    1,                                          // minBytes
+                    maxFetchBytes,                              // maxBytes
+                    FetchIsolation.HIGH_WATERMARK,              // committed 
only
+                    Optional.empty()                            // 
clientMetadata
+                );
+
+                long nextOffset = param.firstOffset();
+                long endOffset = param.lastOffset();
+                int recordCount = (int) (param.lastOffset() - 
param.firstOffset() + 1);
+
+                List<Record> records = new ArrayList<>(recordCount);
+                LinkedHashMap<TopicIdPartition, Long> offsets = new 
LinkedHashMap<>();
+                LinkedHashMap<TopicIdPartition, Integer> maxBytesMap = new 
LinkedHashMap<>();
+                maxBytesMap.put(tp, maxFetchBytes);
+
+                while (nextOffset <= endOffset) {

Review Comment:
   Though it make sense to keep fetching till all offsets are fetched but I was 
wondering if there can be an issue where no new data arrives and the while loop 
continues so shall we have some safe check?



##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -653,6 +684,69 @@ private void handleProduceResponse(ClientResponse 
response) {
                     requestErrorResponse(clientResponseError.exception());
             }
         }
+
+        private void maybeFetchRecordData() {
+            if (cacheHelper.isShareGroupDlqCopyRecordEnabled(param.groupId())) 
{
+                if (!originalRecordData.isEmpty()) {
+                    return;
+                }
+
+                long startTime = time.hiResClockMs();
+                TopicIdPartition tp = param.topicIdPartition();
+
+                FetchParams fetchParams = new FetchParams(
+                    FetchRequest.CONSUMER_REPLICA_ID,           // -1, reading 
as a consumer
+                    -1,                                         // replicaEpoch
+                    0L,                                         // maxWaitMs - 
don't block
+                    1,                                          // minBytes
+                    maxFetchBytes,                              // maxBytes
+                    FetchIsolation.HIGH_WATERMARK,              // committed 
only
+                    Optional.empty()                            // 
clientMetadata
+                );
+
+                long nextOffset = param.firstOffset();
+                long endOffset = param.lastOffset();
+                int recordCount = (int) (param.lastOffset() - 
param.firstOffset() + 1);
+
+                List<Record> records = new ArrayList<>(recordCount);
+                LinkedHashMap<TopicIdPartition, Long> offsets = new 
LinkedHashMap<>();
+                LinkedHashMap<TopicIdPartition, Integer> maxBytesMap = new 
LinkedHashMap<>();
+                maxBytesMap.put(tp, maxFetchBytes);
+
+                while (nextOffset <= endOffset) {
+                    offsets.put(tp, nextOffset);

Review Comment:
   Would be good to write a comment that the fetch is for same topic partition 
again hence just update the nextOffset without clearing the map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to