apoorvmittal10 commented on code in PR #22479:
URL: https://github.com/apache/kafka/pull/22479#discussion_r3389620766
##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -82,12 +91,14 @@ public class ShareGroupDLQStateManager {
private final Time time;
private final Timer timer;
private final ShareGroupDLQMetadataCacheHelper cacheHelper;
+ private final LogReader logReader;
private final ShareGroupMetrics shareGroupMetrics;
public static final long REQUEST_BACKOFF_MS = 1_000L;
public static final long REQUEST_BACKOFF_MAX_MS = 30_000L;
private static final int MAX_REQUEST_ATTEMPTS = 5;
private static final int RETRY_BACKOFF_EXP_BASE =
CommonClientConfigs.RETRY_BACKOFF_EXP_BASE;
private static final double RETRY_BACKOFF_JITTER =
CommonClientConfigs.RETRY_BACKOFF_JITTER;
+ private static final int DLQ_MAX_FETCH_BYTES = 1024 * 1024;
Review Comment:
Can you please write the code comment here about the choice of 1 MB as what
we discussed in other comments.
##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -772,13 +777,7 @@ class BrokerServer(
if (config.shareGroupConfig.shareGroupDLQManagerClassName.nonEmpty) {
val klass =
Utils.loadClass(config.shareGroupConfig.shareGroupDLQManagerClassName,
classOf[Object]).asInstanceOf[Class[ShareGroupDLQManager]]
if (klass.getName.equals(classOf[DefaultShareGroupDLQManager].getName)) {
- DefaultShareGroupDLQManager.instance(
- NetworkUtils.buildNetworkClient("ShareGroupDLQManager", config,
metrics, Time.SYSTEM, new LogContext(s"[ShareGroupDLQManager
broker=${config.brokerId}]")),
- new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key =>
shareCoordinator.partitionFor(key), config.interBrokerListenerName,
groupConfigManager),
- Time.SYSTEM,
- shareGroupTimer,
- shareGroupMetrics
- )
+
DefaultShareGroupDLQManager.instance(NetworkUtils.buildNetworkClient("ShareGroupDLQManager",
config, metrics, Time.SYSTEM, new LogContext(s"[ShareGroupDLQManager
broker=${config.brokerId}]")), new
ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key =>
shareCoordinator.partitionFor(key), config.interBrokerListenerName,
groupConfigManager), Time.SYSTEM, shareGroupTimer, shareGroupMetrics,
shareGroupLogReader)
Review Comment:
Can we write the parameters in separate line as it was earlier for
readability.
##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -653,6 +684,80 @@ private void handleProduceResponse(ClientResponse
response) {
requestErrorResponse(clientResponseError.exception());
}
}
+
+ private void maybeFetchRecordData() {
+ if (cacheHelper.isShareGroupDlqCopyRecordEnabled(param.groupId()))
{
+ // A non-null originalRecordData indicates that the data for
the offsets was
+ // already fetched at a previous time. This could happen in
case there was
+ // a retriable exception in a previous produce request, and it
is being re-sent.
+ // This optimization will help in reducing LogReader.read
calls. Note that an
+ // empty (but non-null) map means a previous fetch found no
records in range
+ // (e.g. all offsets compacted away), so we still skip
re-fetching in that case.
+ if (originalRecordData != null) {
+ return;
+ }
Review Comment:
Sorry, to ask again. Are the originalDataRecords be long lived in broker
memory, if yes then how long?
##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -214,6 +231,7 @@ class ProduceRequestHandler implements
RequestCompletionHandler {
private Node dlqPartitionLeaderNode;
private int dlqDestinationPartition;
private ShareGroupDLQMetadataCacheHelper.TopicPartitionData
dlqTopicPartitionData;
+ private Map<Long, Record> originalRecordData;
Review Comment:
Can you help me understand the benefit of keeping the originalRecordData as
instance level and not fetching the data for offsets locally while processing a
DLQ message from a queue?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]