lucasbru commented on code in PR #21535:
URL: https://github.com/apache/kafka/pull/21535#discussion_r2877263789
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -861,6 +843,12 @@ public class StreamsConfig extends AbstractConfig {
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
ProducerConfig.TRANSACTIONAL_ID_CONFIG
};
+ @SuppressWarnings("WeakerAccess")
+ public static final String
PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG =
"processing.exception.handler.global.enabled";
Review Comment:
as per KOP discussion, this should be depreacted
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1274,7 +1256,12 @@ public class StreamsConfig extends AbstractConfig {
Type.LONG,
null,
Importance.LOW,
- WINDOW_SIZE_MS_DOC);
+ WINDOW_SIZE_MS_DOC)
+ .define(PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,
+ Type.BOOLEAN,
+ false,
+ Importance.LOW,
Review Comment:
wasn't the conclusion HIGH from the KIP discussion?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
Review Comment:
This comment needs to be updated.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -47,16 +51,10 @@
import java.io.File;
import java.io.IOException;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
Review Comment:
We don't do this in kafka.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -404,6 +405,7 @@ private StateConsumer initialize() {
globalProcessorContext,
stateMgr,
config.deserializationExceptionHandler(),
+ processingExceptionHandler,
Review Comment:
What's the strong incentive to not share the object?
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java:
##########
@@ -404,4 +547,4 @@ private void produceGlobalTableValues() {
new Properties()),
mockTime);
}
-}
+}
Review Comment:
nit: add back the newline
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +311,103 @@ private void reprocessState(final List<TopicPartition>
topicPartitions,
record.headers());
globalProcessorContext.setRecordContext(recordContext);
- try {
- if (record.key() != null) {
- source.process(new Record(
+ if (record.key() != null) {
+ // Deserialization phase
+ final Record<?, ?> deserializedRecord;
+ try {
+ deserializedRecord = new Record<>(
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
reprocessFactory.valueDeserializer().deserialize(record.topic(),
record.value()),
record.timestamp(),
- record.headers()));
+ record.headers());
+ } catch (final Exception deserializationException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ handleDeserializationFailure(
+ deserializationExceptionHandler,
+ globalProcessorContext,
+ deserializationException,
+ record,
+ log,
+ droppedRecordsSensor(
+ Thread.currentThread().getName(),
+ globalProcessorContext.taskId().toString(),
+ globalProcessorContext.metrics()
+ ),
+ null
+ );
+ continue; // Skip this record
+ }
+ final ProcessingExceptionHandler.Response response;
+ // Processing phase
+ try {
+ @SuppressWarnings("unchecked")
+ final Processor<Object, Object, Object, Object>
typedSource =
+ (Processor<Object, Object, Object, Object>)
source;
+ @SuppressWarnings("unchecked")
+ final Record<Object, Object> typedRecord =
(Record<Object, Object>) deserializedRecord;
+ typedSource.process(typedRecord);
restoreCount++;
batchRestoreCount++;
+ } catch (final Exception processingException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ if (processingExceptionHandler != null) {
+ final ErrorHandlerContext errorHandlerContext
= new DefaultErrorHandlerContext(
+ globalProcessorContext,
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.headers(),
+ storeName,
+ globalProcessorContext.taskId(),
+ record.timestamp(),
+ record.key(),
+ record.value()
+ );
+ try {
+ response =
+
Objects.requireNonNull(processingExceptionHandler.handleError(
+ errorHandlerContext,
+ deserializedRecord,
+ processingException
+ ), "Invalid
ProcessingExceptionHandler response");
+ log.warn("Dead letter queue records cannot
be sent for GlobalKTable processors " +
Review Comment:
Shoulnd't this only be logged if ther eare actual DLQ records produced?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +311,103 @@ private void reprocessState(final List<TopicPartition>
topicPartitions,
record.headers());
globalProcessorContext.setRecordContext(recordContext);
- try {
- if (record.key() != null) {
- source.process(new Record(
+ if (record.key() != null) {
+ // Deserialization phase
+ final Record<?, ?> deserializedRecord;
+ try {
+ deserializedRecord = new Record<>(
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
reprocessFactory.valueDeserializer().deserialize(record.topic(),
record.value()),
record.timestamp(),
- record.headers()));
+ record.headers());
+ } catch (final Exception deserializationException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ handleDeserializationFailure(
+ deserializationExceptionHandler,
+ globalProcessorContext,
+ deserializationException,
+ record,
+ log,
+ droppedRecordsSensor(
+ Thread.currentThread().getName(),
+ globalProcessorContext.taskId().toString(),
+ globalProcessorContext.metrics()
+ ),
+ null
+ );
+ continue; // Skip this record
+ }
+ final ProcessingExceptionHandler.Response response;
+ // Processing phase
+ try {
+ @SuppressWarnings("unchecked")
+ final Processor<Object, Object, Object, Object>
typedSource =
+ (Processor<Object, Object, Object, Object>)
source;
+ @SuppressWarnings("unchecked")
+ final Record<Object, Object> typedRecord =
(Record<Object, Object>) deserializedRecord;
+ typedSource.process(typedRecord);
restoreCount++;
batchRestoreCount++;
+ } catch (final Exception processingException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ if (processingExceptionHandler != null) {
+ final ErrorHandlerContext errorHandlerContext
= new DefaultErrorHandlerContext(
+ globalProcessorContext,
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.headers(),
+ storeName,
Review Comment:
storeName is used as the processorNodeId parameter here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]