apoorvmittal10 commented on code in PR #22479:
URL: https://github.com/apache/kafka/pull/22479#discussion_r3361829394
##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -777,7 +777,9 @@ class BrokerServer(
new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key =>
shareCoordinator.partitionFor(key), config.interBrokerListenerName,
groupConfigManager),
Time.SYSTEM,
shareGroupTimer,
- shareGroupMetrics
+ shareGroupMetrics,
+ config.getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG),
+ new ReplicaManagerLogReader(replicaManager)
Review Comment:
There should already be one ReplicaManagerLogReader created for
SharePartitionManager, why not to re-use?
##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -777,7 +777,9 @@ class BrokerServer(
new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key =>
shareCoordinator.partitionFor(key), config.interBrokerListenerName,
groupConfigManager),
Time.SYSTEM,
shareGroupTimer,
- shareGroupMetrics
+ shareGroupMetrics,
+ config.getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG),
Review Comment:
Are the reads going to be for multiple offsets or 1 at a time? I am
expecting a specific offset data will be fetched, correct? If that's the case
then do we need to 55MB as max bytes config?
##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -345,10 +366,20 @@ public void populateDLQTopicData() throws ConfigException
{
}
public ProduceRequestData.TopicProduceData topicProduceData() {
+ maybeFetchRecordData();
+
List<SimpleRecord> simpleRecords = new ArrayList<>();
for (long i = param.firstOffset(); i <= param.lastOffset(); i++) {
long timestamp = time.hiResClockMs();
- simpleRecords.add(new SimpleRecord(timestamp, (byte[]) null,
null, headers(i)));
+ int recordIndex = (int) (i - param.firstOffset());
+ ByteBuffer key = null;
+ ByteBuffer value = null;
+ if (originalRecordData.size() > recordIndex) {
Review Comment:
What's the significance of this check, isn't you expect that if
originalRecordData is not empty then all offsets data should be present? So if
(!originalRecordData.isEmpty()) check should be sufficient?
Also where do we validate that originalRecordData strictly has all offset
data which is requested?
##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -231,6 +251,7 @@ public ProduceRequestHandler(
) {
this.param = param;
this.result = result;
+ this.originalRecordData = new ArrayList<>();
Review Comment:
Shouldn't we lazily initialize the arraylist when original record data
capture is enabled?
##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -653,6 +684,69 @@ private void handleProduceResponse(ClientResponse
response) {
requestErrorResponse(clientResponseError.exception());
}
}
+
+ private void maybeFetchRecordData() {
+ if (cacheHelper.isShareGroupDlqCopyRecordEnabled(param.groupId()))
{
+ if (!originalRecordData.isEmpty()) {
+ return;
+ }
Review Comment:
When this can happen, a comment in code will be helpful?
##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -653,6 +684,69 @@ private void handleProduceResponse(ClientResponse
response) {
requestErrorResponse(clientResponseError.exception());
}
}
+
+ private void maybeFetchRecordData() {
+ if (cacheHelper.isShareGroupDlqCopyRecordEnabled(param.groupId()))
{
+ if (!originalRecordData.isEmpty()) {
+ return;
+ }
+
+ long startTime = time.hiResClockMs();
+ TopicIdPartition tp = param.topicIdPartition();
+
+ FetchParams fetchParams = new FetchParams(
+ FetchRequest.CONSUMER_REPLICA_ID, // -1, reading
as a consumer
+ -1, // replicaEpoch
+ 0L, // maxWaitMs -
don't block
+ 1, // minBytes
+ maxFetchBytes, // maxBytes
+ FetchIsolation.HIGH_WATERMARK, // committed
only
+ Optional.empty() //
clientMetadata
+ );
+
+ long nextOffset = param.firstOffset();
+ long endOffset = param.lastOffset();
+ int recordCount = (int) (param.lastOffset() -
param.firstOffset() + 1);
+
+ List<Record> records = new ArrayList<>(recordCount);
+ LinkedHashMap<TopicIdPartition, Long> offsets = new
LinkedHashMap<>();
+ LinkedHashMap<TopicIdPartition, Integer> maxBytesMap = new
LinkedHashMap<>();
+ maxBytesMap.put(tp, maxFetchBytes);
+
+ while (nextOffset <= endOffset) {
Review Comment:
Though it make sense to keep fetching till all offsets are fetched but I was
wondering if there can be an issue where no new data arrives and the while loop
continues so shall we have some safe check?
##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -653,6 +684,69 @@ private void handleProduceResponse(ClientResponse
response) {
requestErrorResponse(clientResponseError.exception());
}
}
+
+ private void maybeFetchRecordData() {
+ if (cacheHelper.isShareGroupDlqCopyRecordEnabled(param.groupId()))
{
+ if (!originalRecordData.isEmpty()) {
+ return;
+ }
+
+ long startTime = time.hiResClockMs();
+ TopicIdPartition tp = param.topicIdPartition();
+
+ FetchParams fetchParams = new FetchParams(
+ FetchRequest.CONSUMER_REPLICA_ID, // -1, reading
as a consumer
+ -1, // replicaEpoch
+ 0L, // maxWaitMs -
don't block
+ 1, // minBytes
+ maxFetchBytes, // maxBytes
+ FetchIsolation.HIGH_WATERMARK, // committed
only
+ Optional.empty() //
clientMetadata
+ );
+
+ long nextOffset = param.firstOffset();
+ long endOffset = param.lastOffset();
+ int recordCount = (int) (param.lastOffset() -
param.firstOffset() + 1);
+
+ List<Record> records = new ArrayList<>(recordCount);
+ LinkedHashMap<TopicIdPartition, Long> offsets = new
LinkedHashMap<>();
+ LinkedHashMap<TopicIdPartition, Integer> maxBytesMap = new
LinkedHashMap<>();
+ maxBytesMap.put(tp, maxFetchBytes);
+
+ while (nextOffset <= endOffset) {
+ offsets.put(tp, nextOffset);
Review Comment:
Would be good to write a comment that the fetch is for same topic partition
again hence just update the nextOffset without clearing the map.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]