mjsax commented on code in PR #17169:
URL: https://github.com/apache/kafka/pull/17169#discussion_r1757814889
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -252,40 +254,55 @@ public <K, V> void send(final String topic,
final ProducerRecord<byte[], byte[]> serializedRecord = new
ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
streamsProducer.send(serializedRecord, (metadata, exception) -> {
- // if there's already an exception record, skip logging offsets or
new exceptions
- if (sendException.get() != null) {
- return;
- }
-
- if (exception == null) {
- final TopicPartition tp = new TopicPartition(metadata.topic(),
metadata.partition());
- if (metadata.offset() >= 0L) {
- offsets.put(tp, metadata.offset());
- } else {
- log.warn("Received offset={} in produce response for {}",
metadata.offset(), tp);
+ try {
+ // if there's already an exception record, skip logging
offsets or new exceptions
+ if (sendException.get() != null) {
+ return;
}
- if (!topic.endsWith("-changelog")) {
- // we may not have created a sensor during initialization
if the node uses dynamic topic routing,
- // as all topics are not known up front, so create the
sensor for this topic if absent
- final Sensor topicProducedSensor =
producedSensorByTopic.computeIfAbsent(
- topic,
- t -> TopicMetrics.producedSensor(
- Thread.currentThread().getName(),
- taskId.toString(),
- processorNodeId,
+ if (exception == null) {
+ final TopicPartition tp = new
TopicPartition(metadata.topic(), metadata.partition());
+ if (metadata.offset() >= 0L) {
+ offsets.put(tp, metadata.offset());
+ } else {
+ log.warn("Received offset={} in produce response for
{}", metadata.offset(), tp);
+ }
+
+ if (!topic.endsWith("-changelog")) {
+ // we may not have created a sensor during
initialization if the node uses dynamic topic routing,
+ // as all topics are not known up front, so create the
sensor for this topic if absent
+ final Sensor topicProducedSensor =
producedSensorByTopic.computeIfAbsent(
topic,
- context.metrics()
- )
+ t -> TopicMetrics.producedSensor(
+ Thread.currentThread().getName(),
+ taskId.toString(),
+ processorNodeId,
+ topic,
+ // no `null` check required, as `context` can
only be null for changelogs what we check above
+ context.metrics()
+ )
+ );
+ final long bytesProduced =
producerRecordSizeInBytes(serializedRecord);
+ topicProducedSensor.record(
+ bytesProduced,
+ // no `null` check required, as `context` can only
be null for changelogs what we check above
+ context.currentSystemTimeMs()
+ );
+ }
+ } else {
+ recordSendError(
+ topic,
+ exception,
+ serializedRecord,
+ context,
Review Comment:
```suggestion
context, // ok as-is; `null` check done inside
`recordSendError(...)`
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]