loicgreffier commented on code in PR #18739: URL: https://github.com/apache/kafka/pull/18739#discussion_r2125012007
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java: ########## @@ -1890,6 +1892,69 @@ public void shouldNotSendIfSendOfOtherTaskFailedInCallback() { )); } + @Test + public void shouldFreeRawRecordsInContextBeforeSending() { + final KafkaException exception = new KafkaException("KABOOM!"); + final byte[][] sourceRawData = new byte[][]{new byte[]{}, new byte[]{}}; + + final RecordCollector collector = new RecordCollectorImpl( + logContext, + taskId, + getExceptionalStreamsProducerOnSend(exception), + new ProductionExceptionHandler() { + @Override + public void configure(final Map<String, ?> configs) { + + } + + @Override + public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord<byte[], byte[]> record, final Exception exception) { + sourceRawData[0] = context.sourceRawKey(); + sourceRawData[1] = context.sourceRawValue(); + return ProductionExceptionHandlerResponse.CONTINUE; + } + }, + streamsMetrics, + topology + ); + + collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, sinkNodeName, context, streamPartitioner); + collector.flush(); Review Comment: Flush was not necessary in the test, it has been removed. ########## streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java: ########## @@ -110,4 +110,37 @@ public interface RecordContext { */ Headers headers(); + /** + * Return the non-deserialized byte[] of the input message key if the context has been triggered by a message. + * + * <p> If this method is invoked within a {@link Punctuator#punctuate(long) + * punctuation callback}, or while processing a record that was forwarded by a punctuation + * callback, it will return null. + * + * <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent + * to the repartition topic. + * + * <p> Always returns null if this method is invoked within a + * ProductionExceptionHandler.handle(ErrorHandlerContext, ProducerRecord, Exception) Review Comment: Only `ErrorHandlerContext` is given to the `ProductionExceptionHandler#handle`, so this is a bad c&p. I've updated both code and KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org