cadonna commented on code in PR #17169:
URL: https://github.com/apache/kafka/pull/17169#discussion_r1758243292
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -252,40 +254,55 @@ public <K, V> void send(final String topic,
final ProducerRecord<byte[], byte[]> serializedRecord = new
ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
streamsProducer.send(serializedRecord, (metadata, exception) -> {
- // if there's already an exception record, skip logging offsets or
new exceptions
- if (sendException.get() != null) {
- return;
- }
-
- if (exception == null) {
- final TopicPartition tp = new TopicPartition(metadata.topic(),
metadata.partition());
- if (metadata.offset() >= 0L) {
- offsets.put(tp, metadata.offset());
- } else {
- log.warn("Received offset={} in produce response for {}",
metadata.offset(), tp);
+ try {
+ // if there's already an exception record, skip logging
offsets or new exceptions
+ if (sendException.get() != null) {
+ return;
}
- if (!topic.endsWith("-changelog")) {
- // we may not have created a sensor during initialization
if the node uses dynamic topic routing,
- // as all topics are not known up front, so create the
sensor for this topic if absent
- final Sensor topicProducedSensor =
producedSensorByTopic.computeIfAbsent(
- topic,
- t -> TopicMetrics.producedSensor(
- Thread.currentThread().getName(),
- taskId.toString(),
- processorNodeId,
+ if (exception == null) {
+ final TopicPartition tp = new
TopicPartition(metadata.topic(), metadata.partition());
+ if (metadata.offset() >= 0L) {
+ offsets.put(tp, metadata.offset());
+ } else {
+ log.warn("Received offset={} in produce response for
{}", metadata.offset(), tp);
+ }
+
+ if (!topic.endsWith("-changelog")) {
+ // we may not have created a sensor during
initialization if the node uses dynamic topic routing,
+ // as all topics are not known up front, so create the
sensor for this topic if absent
+ final Sensor topicProducedSensor =
producedSensorByTopic.computeIfAbsent(
topic,
- context.metrics()
- )
+ t -> TopicMetrics.producedSensor(
+ Thread.currentThread().getName(),
+ taskId.toString(),
+ processorNodeId,
+ topic,
+ // no `null` check required, as `context` can
only be null for changelogs what we check above
+ context.metrics()
+ )
+ );
+ final long bytesProduced =
producerRecordSizeInBytes(serializedRecord);
+ topicProducedSensor.record(
+ bytesProduced,
+ // no `null` check required, as `context` can only
be null for changelogs what we check above
+ context.currentSystemTimeMs()
+ );
+ }
+ } else {
+ recordSendError(
+ topic,
+ exception,
+ serializedRecord,
+ context, // ok as-is; `null` check done inside
`recordSendError(...)`
Review Comment:
Could you remove the inline comment, please 🙏 ?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -252,40 +254,55 @@ public <K, V> void send(final String topic,
final ProducerRecord<byte[], byte[]> serializedRecord = new
ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
streamsProducer.send(serializedRecord, (metadata, exception) -> {
- // if there's already an exception record, skip logging offsets or
new exceptions
- if (sendException.get() != null) {
- return;
- }
-
- if (exception == null) {
- final TopicPartition tp = new TopicPartition(metadata.topic(),
metadata.partition());
- if (metadata.offset() >= 0L) {
- offsets.put(tp, metadata.offset());
- } else {
- log.warn("Received offset={} in produce response for {}",
metadata.offset(), tp);
+ try {
+ // if there's already an exception record, skip logging
offsets or new exceptions
+ if (sendException.get() != null) {
+ return;
}
- if (!topic.endsWith("-changelog")) {
- // we may not have created a sensor during initialization
if the node uses dynamic topic routing,
- // as all topics are not known up front, so create the
sensor for this topic if absent
- final Sensor topicProducedSensor =
producedSensorByTopic.computeIfAbsent(
- topic,
- t -> TopicMetrics.producedSensor(
- Thread.currentThread().getName(),
- taskId.toString(),
- processorNodeId,
+ if (exception == null) {
+ final TopicPartition tp = new
TopicPartition(metadata.topic(), metadata.partition());
+ if (metadata.offset() >= 0L) {
+ offsets.put(tp, metadata.offset());
+ } else {
+ log.warn("Received offset={} in produce response for
{}", metadata.offset(), tp);
+ }
+
+ if (!topic.endsWith("-changelog")) {
+ // we may not have created a sensor during
initialization if the node uses dynamic topic routing,
+ // as all topics are not known up front, so create the
sensor for this topic if absent
+ final Sensor topicProducedSensor =
producedSensorByTopic.computeIfAbsent(
topic,
- context.metrics()
- )
+ t -> TopicMetrics.producedSensor(
+ Thread.currentThread().getName(),
+ taskId.toString(),
+ processorNodeId,
+ topic,
+ // no `null` check required, as `context` can
only be null for changelogs what we check above
Review Comment:
Could you please remove the inline comment?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -1580,6 +1580,98 @@ public void
shouldThrowStreamsExceptionWhenKeySerializationFailedAndProductionEx
}
}
+ @Test
+ public void shouldNotFailIfContextIsNotAvailableOnSerializationError() {
+ try (final ErrorStringSerializer errorSerializer = new
ErrorStringSerializer()) {
+ final RecordCollector collector = new RecordCollectorImpl(
+ logContext,
+ taskId,
+ streamsProducer,
+ productionExceptionHandler,
+ streamsMetrics,
+ topology
+ );
+
+ assertThrows(
+ StreamsException.class, // should not crash with
NullPointerException
+ () -> collector.send(
+ topic,
+ "key",
+ "val",
+ null,
+ 0,
+ null,
+ errorSerializer,
+ stringSerializer,
+ sinkNodeName,
+ null // pass `null` context for testing
+ )
+ );
+ }
+ }
+
+ @Test
+ public void
shouldNotFailIfRecordContextIsNotAvailableOnSerializationError() {
+ try (final ErrorStringSerializer errorSerializer = new
ErrorStringSerializer()) {
+ final RecordCollector collector = new RecordCollectorImpl(
+ logContext,
+ taskId,
+ streamsProducer,
+ productionExceptionHandler,
+ streamsMetrics,
+ topology
+ );
+
+ // RecordContext is null when writing into a changelog topic
+ context.setRecordContext(null);
+ assertThrows(
+ StreamsException.class, // should not crash with
NullPointerException
+ () -> collector.send(topic, "key", "val", null, 0, null,
errorSerializer, stringSerializer, sinkNodeName, context)
+ );
+ }
+ }
+
+ @Test
+ public void shouldNotFailIfContextIsNotAvailableOnSendError() {
+ final RecordCollector collector = new RecordCollectorImpl(
+ logContext,
+ taskId,
+ getExceptionalStreamsProducerOnSend(new
RuntimeException("Kaboom!")),
+ productionExceptionHandler,
+ streamsMetrics,
+ topology
+ );
+
+ collector.send(
+ topic,
+ "key",
+ "val",
+ null,
+ 0,
+ null,
+ stringSerializer,
+ stringSerializer,
+ sinkNodeName,
+ null // pass `null` context for testing
+ );
+ }
+
+ @Test
+ public void shouldNotFailIfRecordContextIsNotAvailableOnSendError() {
+ final RecordCollector collector = new RecordCollectorImpl(
+ logContext,
+ taskId,
+ getExceptionalStreamsProducerOnSend(new
RuntimeException("Kaboom!")),
+ productionExceptionHandler,
+ streamsMetrics,
+ topology
+ );
+
+ // RecordContext is null when writing into a changelog topic
+ context.setRecordContext(null);
Review Comment:
See my comment above.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -352,6 +364,33 @@ private <K, V> void handleException(final
ProductionExceptionHandler.Serializati
droppedRecordsSensor.record();
}
+ private DefaultErrorHandlerContext errorHandlerContext(final
InternalProcessorContext<Void, Void> context,
+ final String
processorNodeId) {
+ final RecordContext recordContext = context != null ?
context.recordContext() : null;
+
+ return recordContext != null ?
+ new DefaultErrorHandlerContext(
+ null, // only required to pass for
DeserializationExceptionHandler
Review Comment:
Then maybe, it makes sense to have an overload for the constructor instead
of an inline comment.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -1580,6 +1580,98 @@ public void
shouldThrowStreamsExceptionWhenKeySerializationFailedAndProductionEx
}
}
+ @Test
+ public void shouldNotFailIfContextIsNotAvailableOnSerializationError() {
+ try (final ErrorStringSerializer errorSerializer = new
ErrorStringSerializer()) {
+ final RecordCollector collector = new RecordCollectorImpl(
+ logContext,
+ taskId,
+ streamsProducer,
+ productionExceptionHandler,
+ streamsMetrics,
+ topology
+ );
+
+ assertThrows(
+ StreamsException.class, // should not crash with
NullPointerException
+ () -> collector.send(
+ topic,
+ "key",
+ "val",
+ null,
+ 0,
+ null,
+ errorSerializer,
+ stringSerializer,
+ sinkNodeName,
+ null // pass `null` context for testing
Review Comment:
A variable `notAvailableContext = null` would be better than the inline
comment.
##########
streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java:
##########
@@ -42,6 +42,8 @@ public interface ErrorHandlerContext {
* {@link
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
String...)}
* (and siblings), that do not always guarantee to provide a valid topic
name, as they might be
* executed "out-of-band" due to some internal optimizations applied by
the Kafka Streams DSL.
+ * Additionally, when writing into a changelog topic, there is no
associated input record,
+ * and thus no topic name is available.
Review Comment:
The issue probably starts already in the class javadoc since it only
mentions failed processing.
I would extend that javadoc and describe all three cases in which the error
context is used and give names to the cases. Something like deserialization
exception handling, processing exception handling, and production exception
handling. Then here we could write during production exception handling when
writing into a changelog topic no input topic name is available.
We can do that in a follow-up PR. Let us not block this PR on this.
Comments are always hard to keep up-to-date. That is the reason, I do not
like inline comments because they eventually start to lie at some point. Also
Javadocs are challenging to keep up-to-date, but it is worth the pain, IMO.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -252,40 +254,55 @@ public <K, V> void send(final String topic,
final ProducerRecord<byte[], byte[]> serializedRecord = new
ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
streamsProducer.send(serializedRecord, (metadata, exception) -> {
- // if there's already an exception record, skip logging offsets or
new exceptions
- if (sendException.get() != null) {
- return;
- }
-
- if (exception == null) {
- final TopicPartition tp = new TopicPartition(metadata.topic(),
metadata.partition());
- if (metadata.offset() >= 0L) {
- offsets.put(tp, metadata.offset());
- } else {
- log.warn("Received offset={} in produce response for {}",
metadata.offset(), tp);
+ try {
+ // if there's already an exception record, skip logging
offsets or new exceptions
+ if (sendException.get() != null) {
+ return;
}
- if (!topic.endsWith("-changelog")) {
- // we may not have created a sensor during initialization
if the node uses dynamic topic routing,
- // as all topics are not known up front, so create the
sensor for this topic if absent
- final Sensor topicProducedSensor =
producedSensorByTopic.computeIfAbsent(
- topic,
- t -> TopicMetrics.producedSensor(
- Thread.currentThread().getName(),
- taskId.toString(),
- processorNodeId,
+ if (exception == null) {
+ final TopicPartition tp = new
TopicPartition(metadata.topic(), metadata.partition());
+ if (metadata.offset() >= 0L) {
+ offsets.put(tp, metadata.offset());
+ } else {
+ log.warn("Received offset={} in produce response for
{}", metadata.offset(), tp);
+ }
+
+ if (!topic.endsWith("-changelog")) {
+ // we may not have created a sensor during
initialization if the node uses dynamic topic routing,
+ // as all topics are not known up front, so create the
sensor for this topic if absent
+ final Sensor topicProducedSensor =
producedSensorByTopic.computeIfAbsent(
topic,
- context.metrics()
- )
+ t -> TopicMetrics.producedSensor(
+ Thread.currentThread().getName(),
+ taskId.toString(),
+ processorNodeId,
+ topic,
+ // no `null` check required, as `context` can
only be null for changelogs what we check above
+ context.metrics()
+ )
+ );
+ final long bytesProduced =
producerRecordSizeInBytes(serializedRecord);
+ topicProducedSensor.record(
+ bytesProduced,
+ // no `null` check required, as `context` can only
be null for changelogs what we check above
Review Comment:
Please remove.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]