smjn commented on code in PR #22479:
URL: https://github.com/apache/kafka/pull/22479#discussion_r3367014782
##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -345,10 +366,20 @@ public void populateDLQTopicData() throws ConfigException
{
}
public ProduceRequestData.TopicProduceData topicProduceData() {
+ maybeFetchRecordData();
+
List<SimpleRecord> simpleRecords = new ArrayList<>();
for (long i = param.firstOffset(); i <= param.lastOffset(); i++) {
long timestamp = time.hiResClockMs();
- simpleRecords.add(new SimpleRecord(timestamp, (byte[]) null,
null, headers(i)));
+ int recordIndex = (int) (i - param.firstOffset());
+ ByteBuffer key = null;
+ ByteBuffer value = null;
+ if (originalRecordData.size() > recordIndex) {
Review Comment:
I realised, there could be a bug here if the offsets are sparsely filled due
to compaction on the user topic - replacing this DS with a hashmap and adding
the suggested non-null check should be sufficient.
##########
server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -345,10 +366,20 @@ public void populateDLQTopicData() throws ConfigException
{
}
public ProduceRequestData.TopicProduceData topicProduceData() {
+ maybeFetchRecordData();
+
List<SimpleRecord> simpleRecords = new ArrayList<>();
for (long i = param.firstOffset(); i <= param.lastOffset(); i++) {
long timestamp = time.hiResClockMs();
- simpleRecords.add(new SimpleRecord(timestamp, (byte[]) null,
null, headers(i)));
+ int recordIndex = (int) (i - param.firstOffset());
+ ByteBuffer key = null;
+ ByteBuffer value = null;
+ if (originalRecordData.size() > recordIndex) {
Review Comment:
I realised there could be a bug here if the offsets are sparsely filled due
to compaction on the user topic - replacing this DS with a hashmap and adding
the suggested non-null check should be sufficient.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]