apoorvmittal10 commented on code in PR #22479:
URL: https://github.com/apache/kafka/pull/22479#discussion_r3389620766


##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -82,12 +91,14 @@ public class ShareGroupDLQStateManager {
     private final Time time;
     private final Timer timer;
     private final ShareGroupDLQMetadataCacheHelper cacheHelper;
+    private final LogReader logReader;
     private final ShareGroupMetrics shareGroupMetrics;
     public static final long REQUEST_BACKOFF_MS = 1_000L;
     public static final long REQUEST_BACKOFF_MAX_MS = 30_000L;
     private static final int MAX_REQUEST_ATTEMPTS = 5;
     private static final int RETRY_BACKOFF_EXP_BASE = 
CommonClientConfigs.RETRY_BACKOFF_EXP_BASE;
     private static final double RETRY_BACKOFF_JITTER = 
CommonClientConfigs.RETRY_BACKOFF_JITTER;
+    private static final int DLQ_MAX_FETCH_BYTES = 1024 * 1024;

Review Comment:
   Can you please write the code comment here about the choice of 1 MB as what 
we discussed in other comments.



##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -772,13 +777,7 @@ class BrokerServer(
     if (config.shareGroupConfig.shareGroupDLQManagerClassName.nonEmpty) {
       val klass = 
Utils.loadClass(config.shareGroupConfig.shareGroupDLQManagerClassName, 
classOf[Object]).asInstanceOf[Class[ShareGroupDLQManager]]
       if (klass.getName.equals(classOf[DefaultShareGroupDLQManager].getName)) {
-        DefaultShareGroupDLQManager.instance(
-          NetworkUtils.buildNetworkClient("ShareGroupDLQManager", config, 
metrics, Time.SYSTEM, new LogContext(s"[ShareGroupDLQManager 
broker=${config.brokerId}]")),
-          new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key => 
shareCoordinator.partitionFor(key), config.interBrokerListenerName, 
groupConfigManager),
-          Time.SYSTEM,
-          shareGroupTimer,
-          shareGroupMetrics
-        )
+        
DefaultShareGroupDLQManager.instance(NetworkUtils.buildNetworkClient("ShareGroupDLQManager",
 config, metrics, Time.SYSTEM, new LogContext(s"[ShareGroupDLQManager 
broker=${config.brokerId}]")), new 
ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key => 
shareCoordinator.partitionFor(key), config.interBrokerListenerName, 
groupConfigManager), Time.SYSTEM, shareGroupTimer, shareGroupMetrics, 
shareGroupLogReader)

Review Comment:
   Can we write the parameters in separate line as it was earlier for 
readability.



##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -653,6 +684,80 @@ private void handleProduceResponse(ClientResponse 
response) {
                     requestErrorResponse(clientResponseError.exception());
             }
         }
+
+        private void maybeFetchRecordData() {
+            if (cacheHelper.isShareGroupDlqCopyRecordEnabled(param.groupId())) 
{
+                // A non-null originalRecordData indicates that the data for 
the offsets was
+                // already fetched at a previous time. This could happen in 
case there was
+                // a retriable exception in a previous produce request, and it 
is being re-sent.
+                // This optimization will help in reducing LogReader.read 
calls. Note that an
+                // empty (but non-null) map means a previous fetch found no 
records in range
+                // (e.g. all offsets compacted away), so we still skip 
re-fetching in that case.
+                if (originalRecordData != null) {
+                    return;
+                }

Review Comment:
   Sorry, to ask again. Are the originalDataRecords be long lived in broker 
memory, if yes then how long?



##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -214,6 +231,7 @@ class ProduceRequestHandler implements 
RequestCompletionHandler {
         private Node dlqPartitionLeaderNode;
         private int dlqDestinationPartition;
         private ShareGroupDLQMetadataCacheHelper.TopicPartitionData 
dlqTopicPartitionData;
+        private Map<Long, Record> originalRecordData;

Review Comment:
   Can you help me understand the benefit of keeping the originalRecordData as 
instance level and not fetching the data for offsets locally while processing a 
DLQ message from a queue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to