cadonna commented on code in PR #18739: URL: https://github.com/apache/kafka/pull/18739#discussion_r1935239878
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java: ########## @@ -259,6 +259,10 @@ public <K, V> void send(final String topic, final ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers); + // As many records could be in-flight, + // freeing raw records in the context to reduce memory pressure + freeContext(context); Review Comment: The name of this method is a bit misleading. It basically frees the raw record within the context, not the whole context. What about calling it `freeRawInputRecordFromContext()`? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java: ########## @@ -55,8 +69,26 @@ public Headers headers() { return value.headers(); } + public byte[] rawKey() { + return rawKey; + } + + public byte[] rawValue() { + return rawValue; + } + @Override public String toString() { return value.toString() + ", timestamp = " + timestamp; } + + @Override + public boolean equals(final Object other) { + return super.equals(other); + } + + @Override + public int hashCode() { + return super.hashCode(); + } Review Comment: Why are those needed? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java: ########## @@ -48,6 +50,24 @@ public ProcessorRecordContext(final long timestamp, this.topic = topic; this.partition = partition; this.headers = Objects.requireNonNull(headers); + this.sourceRawKey = null; + this.sourceRawValue = null; Review Comment: You also need to add these info to the `serialize()` and `deserialize()` so that the buffer values also get the source record. Here it gets a bit tricky, because you need to consider the case where a serialized record context does not contain the source record because it was written by a version of Streams that has not yet had the source record in the context. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org