loicgreffier commented on code in PR #16433:
URL: https://github.com/apache/kafka/pull/16433#discussion_r1697519314
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -285,15 +280,95 @@ public <K, V> void send(final String topic,
topicProducedSensor.record(bytesProduced,
context.currentSystemTimeMs());
}
} else {
- recordSendError(topic, exception, serializedRecord);
+ recordSendError(topic, exception, serializedRecord, context,
processorNodeId);
// KAFKA-7510 only put message key and value in TRACE level
log so we don't leak data by default
log.trace("Failed record: (key {} value {} timestamp {})
topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
}
});
}
- private void recordSendError(final String topic, final Exception
exception, final ProducerRecord<byte[], byte[]> serializedRecord) {
+ private <K, V> void handleException(final
ProductionExceptionHandler.SerializationExceptionOrigin origin,
+ final String topic,
+ final K key,
+ final V value,
+ final Headers headers,
+ final Integer partition,
+ final Long timestamp,
+ final String processorNodeId,
+ final InternalProcessorContext<Void,
Void> context,
+ final Exception exception) {
+ final ProducerRecord<K, V> record = new ProducerRecord<>(topic,
partition, timestamp, key, value, headers);
+ final ProductionExceptionHandlerResponse response;
+
+ log.debug(String.format("Error serializing record to topic %s",
topic), exception);
+
+ try {
+ final DefaultErrorHandlerContext errorHandlerContext = new
DefaultErrorHandlerContext(
+ context.recordContext().topic(),
+ context.recordContext().partition(),
+ context.recordContext().offset(),
+ context.recordContext().headers(),
+ context.recordContext().rawRecord().key(),
+ context.recordContext().rawRecord().value(),
+ processorNodeId,
+ taskId);
+ response =
productionExceptionHandler.handleSerializationException(errorHandlerContext,
record, exception, origin);
+ } catch (final Exception e) {
+ log.error("Fatal when handling serialization exception", e);
+ recordSendError(topic, e, null, context, processorNodeId);
+ return;
+ }
+
+ if (response == ProductionExceptionHandlerResponse.FAIL) {
+ throw new StreamsException(
+ String.format(
+ "Unable to serialize record. ProducerRecord(topic=[%s],
partition=[%d], timestamp=[%d]",
+ topic,
+ partition,
+ timestamp),
+ exception
+ );
+ }
+
+ log.warn("Unable to serialize record, continue processing. " +
+ "ProducerRecord(topic=[{}], partition=[{}],
timestamp=[{}])",
+ topic,
+ partition,
+ timestamp);
+
+ droppedRecordsSensor.record();
+ }
+
+ private <K, V> StreamsException
createStreamsExceptionForClassCastException(final String topic,
Review Comment:
@mjsax Addressing it in https://github.com/apache/kafka/pull/16736
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -285,15 +276,104 @@ public <K, V> void send(final String topic,
topicProducedSensor.record(bytesProduced,
context.currentSystemTimeMs());
}
} else {
- recordSendError(topic, exception, serializedRecord);
+ recordSendError(topic, exception, serializedRecord, context,
processorNodeId);
// KAFKA-7510 only put message key and value in TRACE level
log so we don't leak data by default
log.trace("Failed record: (key {} value {} timestamp {})
topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
}
});
}
- private void recordSendError(final String topic, final Exception
exception, final ProducerRecord<byte[], byte[]> serializedRecord) {
+ private <K, V> void handleException(final
ProductionExceptionHandler.SerializationExceptionOrigin origin,
+ final String topic,
+ final K key,
+ final V value,
+ final Headers headers,
+ final Integer partition,
+ final Long timestamp,
+ final String processorNodeId,
+ final InternalProcessorContext<Void,
Void> context,
+ final Exception exception) {
+ final ProducerRecord<K, V> record = new ProducerRecord<>(topic,
partition, timestamp, key, value, headers);
+ final ProductionExceptionHandlerResponse response;
+
+ log.debug(String.format("Error serializing record to topic %s",
topic), exception);
+
+ try {
+ final DefaultErrorHandlerContext errorHandlerContext = new
DefaultErrorHandlerContext(
+ context.recordContext().topic(),
+ context.recordContext().partition(),
+ context.recordContext().offset(),
+ context.recordContext().headers(),
+ processorNodeId,
+ taskId);
+ response =
productionExceptionHandler.handleSerializationException(errorHandlerContext,
record, exception, origin);
+ } catch (final Exception e) {
+ log.error("Fatal when handling serialization exception", e);
+ recordSendError(topic, e, null, context, processorNodeId);
+ return;
+ }
+
+ if (response == ProductionExceptionHandlerResponse.FAIL) {
+ throw new StreamsException(
+ String.format(
+ "Unable to serialize record. ProducerRecord(topic=[%s],
partition=[%d], timestamp=[%d]",
+ topic,
+ partition,
+ timestamp),
+ exception
+ );
+ }
+
+ log.warn("Unable to serialize record, continue processing. " +
+ "ProducerRecord(topic=[{}], partition=[{}],
timestamp=[{}])",
+ topic,
+ partition,
+ timestamp);
+
+ droppedRecordsSensor.record();
+ }
+ private <K> StreamsException
createStreamsExceptionForKeyClassCastException(final String topic,
Review Comment:
@mjsax Addressing it in https://github.com/apache/kafka/pull/16736
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]